Skip to main content

surfpool_core/runloops/
mod.rs

1#![allow(unused_imports, dead_code, unused_mut, unused_variables)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::SocketAddr,
6    path::PathBuf,
7    sync::{Arc, RwLock},
8    thread::{JoinHandle, sleep},
9    time::{Duration, Instant},
10};
11
12use agave_geyser_plugin_interface::geyser_plugin_interface::{
13    GeyserPlugin, ReplicaBlockInfoV4, ReplicaBlockInfoVersions, ReplicaEntryInfoV2,
14    ReplicaEntryInfoVersions, ReplicaTransactionInfoV3, ReplicaTransactionInfoVersions, SlotStatus,
15};
16use chrono::{Local, Utc};
17use crossbeam::select;
18use crossbeam_channel::{Receiver, Sender, unbounded};
19use ipc_channel::{
20    ipc::{IpcOneShotServer, IpcReceiver},
21    router::RouterProxy,
22};
23use itertools::Itertools;
24use jsonrpc_core::MetaIoHandler;
25use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
26use jsonrpc_pubsub::{PubSubHandler, Session};
27use jsonrpc_ws_server::{RequestContext, ServerBuilder as WsServerBuilder};
28use libloading::{Library, Symbol};
29use serde::Serialize;
30use solana_commitment_config::CommitmentConfig;
31#[cfg(feature = "geyser_plugin")]
32use solana_geyser_plugin_manager::geyser_plugin_manager::{
33    GeyserPluginManager, LoadedGeyserPlugin,
34};
35use solana_message::SimpleAddressLoader;
36use solana_transaction::sanitized::{MessageHash, SanitizedTransaction};
37use solana_transaction_status::RewardsAndNumPartitions;
38#[cfg(feature = "subgraph")]
39use surfpool_subgraph::SurfpoolSubgraphPlugin;
40use surfpool_types::{
41    BlockProductionMode, ClockCommand, ClockEvent, DEFAULT_MAINNET_RPC_URL, DataIndexingCommand,
42    SimnetCommand, SimnetConfig, SimnetEvent, SubgraphCommand, SubgraphPluginConfig,
43    SurfpoolConfig,
44};
45type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
46use txtx_addon_kit::helpers::fs::FileLocation;
47
48use crate::{
49    PluginManagerCommand,
50    rpc::{
51        self, RunloopContext, SurfpoolMiddleware, SurfpoolWebsocketMeta,
52        SurfpoolWebsocketMiddleware, accounts_data::AccountsData, accounts_scan::AccountsScan,
53        admin::AdminRpc, bank_data::BankData, full::Full, minimal::Minimal,
54        surfnet_cheatcodes::SurfnetCheatcodes, ws::Rpc,
55    },
56    surfnet::{GeyserEvent, locker::SurfnetSvmLocker, remote::SurfnetRemoteClient},
57};
58
59const BLOCKHASH_SLOT_TTL: u64 = 75;
60
61/// Checks if a port is available for binding.
62fn check_port_availability(addr: SocketAddr, server_type: &str) -> Result<(), String> {
63    match std::net::TcpListener::bind(addr) {
64        Ok(_listener) => Ok(()),
65        Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
66            let msg = format!(
67                "{} port {} is already in use. Try --port or --ws-port to use a different port.",
68                server_type,
69                addr.port()
70            );
71            eprintln!("Error: {}", msg);
72            Err(msg)
73        }
74        Err(e) => {
75            let msg = format!("Failed to bind {} server to {}: {}", server_type, addr, e);
76            eprintln!("Error: {}", msg);
77            Err(msg)
78        }
79    }
80}
81
82pub async fn start_local_surfnet_runloop(
83    svm_locker: SurfnetSvmLocker,
84    config: SurfpoolConfig,
85    subgraph_commands_tx: Sender<SubgraphCommand>,
86    simnet_commands_tx: Sender<SimnetCommand>,
87    simnet_commands_rx: Receiver<SimnetCommand>,
88    geyser_events_rx: Receiver<GeyserEvent>,
89) -> Result<(), Box<dyn std::error::Error>> {
90    let Some(simnet) = config.simnets.first() else {
91        return Ok(());
92    };
93    let block_production_mode = simnet.block_production_mode.clone();
94
95    let remote_rpc_client = match simnet.offline_mode {
96        true => None,
97        false => SurfnetRemoteClient::new_unsafe(
98            simnet
99                .remote_rpc_url
100                .as_ref()
101                .unwrap_or(&DEFAULT_MAINNET_RPC_URL.to_string()),
102        ),
103    };
104
105    svm_locker
106        .initialize(
107            simnet.slot_time,
108            &remote_rpc_client,
109            simnet.instruction_profiling_enabled,
110            simnet.log_bytes_limit,
111        )
112        .await?;
113
114    svm_locker.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
115
116    // Load snapshot accounts if provided
117    if !simnet.snapshot.is_empty() {
118        match svm_locker
119            .load_snapshot(
120                &simnet.snapshot,
121                remote_rpc_client.as_ref(),
122                CommitmentConfig::confirmed(),
123            )
124            .await
125        {
126            Ok(loaded_count) => {
127                let _ = svm_locker.with_svm_reader(|svm| {
128                    svm.simnet_events_tx.send(SimnetEvent::info(format!(
129                        "Preloaded {} accounts from snapshot(s) into SVM",
130                        loaded_count
131                    )))
132                });
133            }
134            Err(e) => {
135                let _ = svm_locker.with_svm_reader(|svm| {
136                    svm.simnet_events_tx.send(SimnetEvent::warn(format!(
137                        "Error loading snapshot accounts: {}",
138                        e
139                    )))
140                });
141            }
142        }
143    }
144
145    let simnet_events_tx_cc = svm_locker.simnet_events_tx();
146
147    let (plugin_manager_commands_rx, _rpc_handle, _ws_handle) = start_rpc_servers_runloop(
148        &config,
149        &simnet_commands_tx,
150        svm_locker.clone(),
151        &remote_rpc_client,
152    )
153    .await?;
154
155    let simnet_config = simnet.clone();
156
157    match start_geyser_runloop(
158        config.plugin_config_path.clone(),
159        plugin_manager_commands_rx,
160        subgraph_commands_tx.clone(),
161        simnet_events_tx_cc.clone(),
162        geyser_events_rx,
163    ) {
164        Ok(_) => {}
165        Err(e) => {
166            let _ =
167                simnet_events_tx_cc.send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
168        }
169    };
170
171    let (clock_event_rx, clock_command_tx) =
172        start_clock_runloop(simnet_config.slot_time, Some(simnet_events_tx_cc.clone()));
173
174    // Emit TransactionProcessed events for each stored transaction before Ready
175    let initial_transaction_count = svm_locker.with_svm_reader(|svm| {
176        let iter_result = svm.transactions.into_iter();
177        let mut count: u64 = 0;
178
179        if let Ok(iter) = iter_result {
180            let mut events = vec![];
181            for (_, status) in iter {
182                if let Some((tx_meta, _updated_accounts)) = status.as_processed() {
183                    let signature = tx_meta.transaction.signatures[0];
184                    let err = tx_meta.meta.status.clone().err();
185
186                    // Build TransactionMetadata from stored data
187                    let meta = surfpool_types::TransactionMetadata {
188                        signature,
189                        logs: tx_meta.meta.log_messages.clone().unwrap_or_default(),
190                        inner_instructions: tx_meta
191                            .meta
192                            .inner_instructions
193                            .clone()
194                            .unwrap_or_default()
195                            .into_iter()
196                            .map(|inner_ixs| {
197                                inner_ixs
198                                    .instructions
199                                    .into_iter()
200                                    .map(|ix| solana_message::inner_instruction::InnerInstruction {
201                                        instruction: ix.instruction,
202                                        stack_height: ix.stack_height.unwrap_or(1) as u8,
203                                    })
204                                    .collect()
205                            })
206                            .collect(),
207                        compute_units_consumed: tx_meta.meta.compute_units_consumed.unwrap_or(0),
208                        return_data: tx_meta.meta.return_data.clone().unwrap_or_default(),
209                        fee: tx_meta.meta.fee,
210                    };
211
212                    events.push((
213                        tx_meta.slot,
214                        SimnetEvent::TransactionProcessed(Local::now(), meta, err.clone()),
215                    ));
216
217                    count += 1;
218                }
219            }
220            for (_, event) in events
221                .into_iter()
222                .sorted_by(|(a_slot, _), (b_slot, _)| a_slot.cmp(b_slot))
223            {
224                let _ = svm.simnet_events_tx.send(event);
225            }
226        }
227
228        count
229    });
230    let _ = simnet_events_tx_cc.send(SimnetEvent::Ready(initial_transaction_count));
231
232    // Notify geyser plugins that startup is complete
233    let _ = svm_locker.with_svm_reader(|svm| svm.geyser_events_tx.send(GeyserEvent::EndOfStartup));
234
235    start_block_production_runloop(
236        clock_event_rx,
237        clock_command_tx,
238        simnet_commands_rx,
239        simnet_commands_tx.clone(),
240        svm_locker,
241        block_production_mode,
242        &remote_rpc_client,
243        simnet_config.expiry.map(|e| e * 1000),
244        &simnet_config,
245    )
246    .await
247}
248
249#[allow(clippy::too_many_arguments)]
250pub async fn start_block_production_runloop(
251    clock_event_rx: Receiver<ClockEvent>,
252    clock_command_tx: Sender<ClockCommand>,
253    simnet_commands_rx: Receiver<SimnetCommand>,
254    simnet_commands_tx: Sender<SimnetCommand>,
255    svm_locker: SurfnetSvmLocker,
256    mut block_production_mode: BlockProductionMode,
257    remote_rpc_client: &Option<SurfnetRemoteClient>,
258    expiry_duration_ms: Option<u64>,
259    simnet_config: &SimnetConfig,
260) -> Result<(), Box<dyn std::error::Error>> {
261    let remote_client_with_commitment = remote_rpc_client.as_ref().map(|c| {
262        (
263            c.clone(),
264            solana_commitment_config::CommitmentConfig::confirmed(),
265        )
266    });
267    let mut next_scheduled_expiry_check: Option<u64> =
268        expiry_duration_ms.map(|expiry_val| Utc::now().timestamp_millis() as u64 + expiry_val);
269    let global_skip_sig_verify = simnet_config.skip_signature_verification;
270    loop {
271        let mut do_produce_block = false;
272
273        select! {
274            recv(clock_event_rx) -> msg => if let Ok(event) = msg {
275                match event {
276                    ClockEvent::Tick => {
277                        if block_production_mode.eq(&BlockProductionMode::Clock) {
278                            do_produce_block = true;
279                        }
280
281                        if let Some(expiry_ms) = expiry_duration_ms {
282                            if let Some(scheduled_time_ref) = &mut next_scheduled_expiry_check {
283                                let now_ms = Utc::now().timestamp_millis() as u64;
284                                if now_ms >= *scheduled_time_ref {
285                                    let svm = svm_locker.0.read().await;
286                                    if svm.updated_at + expiry_ms < now_ms {
287                                        let _ = simnet_commands_tx.send(SimnetCommand::Terminate(None));
288                                    } else {
289                                        *scheduled_time_ref = svm.updated_at + expiry_ms;
290                                    }
291                                }
292                            }
293                        }
294                    }
295                    ClockEvent::ExpireBlockHash => {
296                        do_produce_block = true;
297                    }
298                }
299            },
300            recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
301                match event {
302                    SimnetCommand::SlotForward(_key) => {
303                        block_production_mode = BlockProductionMode::Manual;
304                        do_produce_block = true;
305                    }
306                    SimnetCommand::SlotBackward(_key) => {
307
308                    }
309                    SimnetCommand::CommandClock(_, update) => {
310                        if let ClockCommand::UpdateSlotInterval(updated_slot_time) = update {
311                            svm_locker.with_svm_writer(|svm_writer| {
312                                svm_writer.slot_time = updated_slot_time;
313                            });
314                        }
315
316                        // Handle PauseWithConfirmation specially
317                        if let ClockCommand::PauseWithConfirmation(response_tx) = update {
318                            // Get current slot and slot_time before pausing
319                            let (current_slot, slot_time) = svm_locker.with_svm_reader(|svm_reader| {
320                                (svm_reader.latest_epoch_info.absolute_slot, svm_reader.slot_time)
321                            });
322
323                            // Send Pause to clock runloop
324                            let _ = clock_command_tx.send(ClockCommand::Pause);
325
326                            // Give the clock time to process the pause command
327                            tokio::time::sleep(tokio::time::Duration::from_millis(slot_time / 2)).await;
328
329                            // Loop and check if the slot has stopped advancing
330                            let max_attempts = 10;
331                            let mut attempts = 0;
332                            loop {
333                                tokio::time::sleep(tokio::time::Duration::from_millis(slot_time)).await;
334
335                                let new_slot = svm_locker.with_svm_reader(|svm_reader| {
336                                    svm_reader.latest_epoch_info.absolute_slot
337                                });
338
339                                // If slot hasn't changed, clock has stopped
340                                if new_slot == current_slot || attempts >= max_attempts {
341                                    break;
342                                }
343
344                                attempts += 1;
345                            }
346
347                            // Read epoch info after clock has stopped
348                            let epoch_info = svm_locker.with_svm_reader(|svm_reader| {
349                                svm_reader.latest_epoch_info.clone()
350                            });
351                            // Send response
352                            let _ = response_tx.send(epoch_info);
353                        } else {
354                            let _ = clock_command_tx.send(update);
355                        }
356                        continue
357                    }
358                    SimnetCommand::UpdateInternalClock(_, clock) => {
359                        // Confirm the current block to materialize any scheduled overrides for this slot
360                        if let Err(e) = svm_locker.confirm_current_block(&remote_client_with_commitment).await {
361                            let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!(
362                                "Failed to confirm block after time travel: {}", e
363                            )));
364                        }
365
366                        svm_locker.with_svm_writer(|svm_writer| {
367                            svm_writer.inner.set_sysvar(&clock);
368                            svm_writer.updated_at = clock.unix_timestamp as u64 * 1_000;
369                            svm_writer.latest_epoch_info.absolute_slot = clock.slot;
370                            svm_writer.latest_epoch_info.epoch = clock.epoch;
371                            svm_writer.latest_epoch_info.slot_index = clock.slot;
372                            svm_writer.latest_epoch_info.epoch = clock.epoch;
373                            svm_writer.latest_epoch_info.absolute_slot = clock.slot + clock.epoch * svm_writer.latest_epoch_info.slots_in_epoch;
374                            let _ = svm_writer.simnet_events_tx.send(SimnetEvent::SystemClockUpdated(clock));
375                        });
376                    }
377                    SimnetCommand::UpdateInternalClockWithConfirmation(_, clock, response_tx) => {
378                        // Confirm the current block to materialize any scheduled overrides for this slot
379                        if let Err(e) = svm_locker.confirm_current_block(&remote_client_with_commitment).await {
380                            let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!(
381                                "Failed to confirm block after time travel: {}", e
382                            )));
383                        }
384
385                        let epoch_info = svm_locker.with_svm_writer(|svm_writer| {
386                            svm_writer.inner.set_sysvar(&clock);
387                            svm_writer.updated_at = clock.unix_timestamp as u64 * 1_000;
388                            svm_writer.latest_epoch_info.absolute_slot = clock.slot;
389                            svm_writer.latest_epoch_info.epoch = clock.epoch;
390                            svm_writer.latest_epoch_info.slot_index = clock.slot;
391                            svm_writer.latest_epoch_info.epoch = clock.epoch;
392                            svm_writer.latest_epoch_info.absolute_slot = clock.slot + clock.epoch * svm_writer.latest_epoch_info.slots_in_epoch;
393                            let _ = svm_writer.simnet_events_tx.send(SimnetEvent::SystemClockUpdated(clock));
394                            svm_writer.latest_epoch_info.clone()
395                        });
396
397                        // Send confirmation back
398                        let _ = response_tx.send(epoch_info);
399                    }
400                    SimnetCommand::UpdateBlockProductionMode(update) => {
401                        block_production_mode = update;
402                        continue
403                    }
404                    SimnetCommand::ProcessTransaction(_key, transaction, status_tx, skip_preflight, skip_sig_verify_override) => {
405                       let skip_sig_verify = skip_sig_verify_override.unwrap_or(global_skip_sig_verify);
406                       let sigverify = !skip_sig_verify;
407                       if let Err(e) = svm_locker.process_transaction(&remote_client_with_commitment, transaction, status_tx, skip_preflight, sigverify).await {
408                            let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!("Failed to process transaction: {}", e)));
409                       }
410                       if block_production_mode.eq(&BlockProductionMode::Transaction) {
411                           do_produce_block = true;
412                       }
413                    }
414                    SimnetCommand::Terminate(_) => {
415                        // Explicitly shutdown storage to trigger WAL checkpoint before exiting
416                        svm_locker.shutdown();
417                        break;
418                    }
419                    SimnetCommand::StartRunbookExecution(runbook_id) => {
420                        svm_locker.start_runbook_execution(runbook_id);
421                    }
422                    SimnetCommand::CompleteRunbookExecution(runbook_id, error) => {
423                        svm_locker.complete_runbook_execution(runbook_id, error);
424                    }
425                    SimnetCommand::FetchRemoteAccounts(pubkeys, remote_url) => {
426                        let remote_client = SurfnetRemoteClient::new_unsafe(&remote_url);
427                        if let Some(remote_client) = remote_client {
428                              match svm_locker.get_multiple_accounts_with_remote_fallback(&remote_client, &pubkeys, CommitmentConfig::confirmed()).await {
429                                 Ok(account_updates) => {
430                                     svm_locker.write_multiple_account_updates(&account_updates.inner);
431                                 }
432                                 Err(e) => {
433                                     svm_locker.simnet_events_tx().try_send(SimnetEvent::error(format!("Failed to fetch remote accounts {:?}: {}", pubkeys, e))).ok();
434                                 }
435                             };
436                        }
437                    }
438                    SimnetCommand::AirdropProcessed => {
439                       if block_production_mode.eq(&BlockProductionMode::Transaction) {
440                           do_produce_block = true;
441                       }
442                    }
443                }
444            },
445        }
446
447        {
448            if do_produce_block {
449                svm_locker
450                    .confirm_current_block(&remote_client_with_commitment)
451                    .await?;
452            }
453        }
454    }
455    Ok(())
456}
457
458pub fn start_clock_runloop(
459    mut slot_time: u64,
460    simnet_events_tx: Option<Sender<SimnetEvent>>,
461) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
462    let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
463    let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
464
465    let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
466        let mut enabled = true;
467        let mut block_hash_timeout = Instant::now();
468
469        loop {
470            match clock_command_rx.try_recv() {
471                Ok(ClockCommand::Pause) => {
472                    enabled = false;
473                    if let Some(ref simnet_events_tx) = simnet_events_tx {
474                        let _ =
475                            simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Pause));
476                    }
477                }
478                Ok(ClockCommand::Resume) => {
479                    enabled = true;
480                    if let Some(ref simnet_events_tx) = simnet_events_tx {
481                        let _ =
482                            simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Resume));
483                    }
484                }
485                Ok(ClockCommand::Toggle) => {
486                    enabled = !enabled;
487                    if let Some(ref simnet_events_tx) = simnet_events_tx {
488                        let _ =
489                            simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Toggle));
490                    }
491                }
492                Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
493                    slot_time = updated_slot_time;
494                }
495                Ok(ClockCommand::PauseWithConfirmation(_)) => {
496                    // This should be handled in the block production runloop, not here
497                    // If it reaches here, just treat it as a regular Pause
498                    enabled = false;
499                    if let Some(ref simnet_events_tx) = simnet_events_tx {
500                        let _ =
501                            simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Pause));
502                    }
503                }
504                Err(_e) => {}
505            }
506            sleep(Duration::from_millis(slot_time));
507            if enabled {
508                let _ = clock_event_tx.send(ClockEvent::Tick);
509                // Todo: the block expiration is not completely accurate.
510                if block_hash_timeout.elapsed()
511                    > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
512                {
513                    let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
514                    block_hash_timeout = Instant::now();
515                }
516            }
517        }
518    });
519
520    (clock_event_rx, clock_command_tx)
521}
522
523fn start_geyser_runloop(
524    plugin_config_paths: Vec<PathBuf>,
525    plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
526    subgraph_commands_tx: Sender<SubgraphCommand>,
527    simnet_events_tx: Sender<SimnetEvent>,
528    geyser_events_rx: Receiver<GeyserEvent>,
529) -> Result<JoinHandle<Result<(), String>>, String> {
530    let handle: JoinHandle<Result<(), String>> = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
531        let mut indexing_enabled = false;
532
533        #[cfg(feature = "geyser_plugin")]
534        let mut plugin_manager = GeyserPluginManager::new();
535        #[cfg(not(feature = "geyser_plugin"))]
536        let mut plugin_manager = ();
537
538        let mut surfpool_plugin_manager: Vec<Box<dyn GeyserPlugin>> = vec![];
539
540        // Map between each plugin's UUID to its entry (index, plugin_name)
541        let mut plugin_map: HashMap<crate::Uuid, (usize, String)> = HashMap::new();
542
543        // helper to log errors that can't be propagated
544        let log_error = |msg:String|{
545            let _ = simnet_events_tx.send(SimnetEvent::error(msg));
546        };
547
548        let log_warn = |msg:String|{
549            let _ = simnet_events_tx.send(SimnetEvent::warn(msg));
550        };
551
552        let log_info = |msg:String|{
553            let _ = simnet_events_tx.send(SimnetEvent::info(msg));
554        };
555
556
557        #[cfg(feature = "geyser_plugin")]
558        for plugin_config_path in plugin_config_paths.into_iter() {
559            let plugin_manifest_location = FileLocation::from_path(plugin_config_path);
560            let config_file = plugin_manifest_location.read_content_as_utf8()?;
561            let result: serde_json::Value = match json5::from_str(&config_file) {
562                Ok(res) => res,
563                Err(e) => {
564                    let error = format!("Unable to read manifest: {}", e);
565                    let _ = simnet_events_tx.send(SimnetEvent::error(error.clone()));
566                    return Err(error)
567                }
568            };
569
570            let plugin_dylib_path = match result.get("libpath").map(|p| p.as_str()) {
571                Some(Some(name)) => name,
572                _ => {
573                    let error = format!("Plugin config file should include a 'libpath' field: {}", plugin_manifest_location);
574                    let _ = simnet_events_tx.send(SimnetEvent::error(error.clone()));
575                    return Err(error)
576                }
577            };
578
579            let mut plugin_dylib_location = plugin_manifest_location.get_parent_location().expect("path invalid");
580            plugin_dylib_location.append_path(&plugin_dylib_path).expect("path invalid");
581
582            let (plugin, lib) = unsafe {
583                let lib = match Library::new(&plugin_dylib_location.to_string()) {
584                    Ok(lib) => lib,
585                    Err(e) => {
586                        log_error(format!("Unable to load plugin {}: {}", plugin_dylib_location.to_string(), e.to_string()));
587                        continue;
588                    }
589                };
590                let constructor: Symbol<PluginConstructor> = lib
591                    .get(b"_create_plugin")
592                    .map_err(|e| format!("{}", e.to_string()))?;
593                let plugin_raw = constructor();
594                (Box::from_raw(plugin_raw), lib)
595            };
596            indexing_enabled = true;
597
598            let mut plugin = LoadedGeyserPlugin::new(lib, plugin, None);
599            if let Err(e) = plugin.on_load(&plugin_manifest_location.to_string(), false) {
600                let error = format!("Unable to load plugin:: {}", e.to_string());
601                let _ = simnet_events_tx.send(SimnetEvent::error(error.clone()));
602                return Err(error)
603            }
604
605            plugin_manager.plugins.push(plugin);
606        }
607
608        let ipc_router = RouterProxy::new();
609
610        // Helper function to load a subgraph plugin
611        #[cfg(feature = "subgraph")]
612        let load_subgraph_plugin = |uuid: uuid::Uuid,
613                                      config: txtx_addon_network_svm_types::subgraph::PluginConfig,
614                                      notifier: crossbeam_channel::Sender<String>,
615                                      surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
616                                      plugin_map: &mut HashMap<uuid::Uuid, (usize, String)>,
617                                      indexing_enabled: &mut bool|
618         -> Result<(), String> {
619            if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::CreateCollection(
620                uuid,
621                config.data.clone(),
622                notifier,
623            )){
624                return Err(format!("Failed to send CreateCollection command: {:?}", e));
625            };
626
627            let mut plugin = SurfpoolSubgraphPlugin::default();
628
629            let (server, ipc_token) =
630                IpcOneShotServer::<IpcReceiver<DataIndexingCommand>>::new()
631                    .expect("Failed to create IPC one-shot server.");
632            let subgraph_plugin_config = SubgraphPluginConfig {
633                uuid,
634                ipc_token,
635                subgraph_request: config.data.clone(),
636            };
637
638            let config_file = serde_json::to_string(&subgraph_plugin_config)
639                .map_err(|e| format!("Failed to serialize subgraph plugin config: {:?}", e))?;
640
641            plugin
642                .on_load(&config_file, false)
643                .map_err(|e| format!("Failed to load Geyser plugin: {:?}", e))?;
644
645                match server.accept() {
646                    Ok((_, rx)) => {
647                        let subgraph_rx = ipc_router
648                            .route_ipc_receiver_to_new_crossbeam_receiver::<DataIndexingCommand>(rx);
649                        if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::ObserveCollection(subgraph_rx)) {
650                            return Err(format!("Failed to send ObserveCollection command: {:?}", e));
651                        }
652                    }
653                    Err(e) => {
654                        return Err(format!("Failed to accept IPC connection for subgraph {}: {:?}", uuid, e));
655                    }
656                };
657
658            *indexing_enabled = true;
659
660            let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
661            let plugin_index = surfpool_plugin_manager.len();
662            surfpool_plugin_manager.push(plugin);
663            plugin_map.insert(uuid, (plugin_index, config.plugin_name.to_string()));
664
665            Ok(())
666        };
667
668        // Helper function to unload a plugin by UUID
669        #[cfg(feature = "subgraph")]
670        let unload_plugin_by_uuid = |uuid: uuid::Uuid,
671                                       surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
672                                       plugin_map: &mut HashMap<uuid::Uuid, (usize, String)>,
673                                       indexing_enabled: &mut bool|
674         -> Result<(), String> {
675            let plugin_index = plugin_map
676                .get(&uuid)
677                .ok_or_else(|| format!("Plugin {} not found", uuid))?
678                .0;
679
680            if plugin_index >= surfpool_plugin_manager.len() {
681                return Err(format!("Plugin index {} out of bounds", plugin_index));
682            }
683
684            // Destroy database/schema for this collection
685            if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::DestroyCollection(uuid)){
686                return Err(format!("Failed to send DestroyCollection command for {}: {:?}", uuid, e));
687            }
688
689            // Unload the plugin
690            surfpool_plugin_manager[plugin_index].on_unload();
691
692            // Remove from tracking structures
693            surfpool_plugin_manager.remove(plugin_index);
694            plugin_map.remove(&uuid);
695
696            // Adjust indices after removal
697            for (index, _) in plugin_map.values_mut() {
698                if *index > plugin_index {
699                    *index -= 1;
700                }
701            }
702
703            // Disable indexing if no plugins remain
704            if surfpool_plugin_manager.is_empty() {
705                *indexing_enabled = false;
706                //  Add Logging When Indexing Disabled
707                log_info("All plugins unloaded,indexing disabled".to_string())
708            }
709
710            Ok(())
711        };
712
713        let err = loop {
714            use agave_geyser_plugin_interface::geyser_plugin_interface::{ReplicaAccountInfoV3, ReplicaAccountInfoVersions};
715
716            use crate::types::GeyserAccountUpdate;
717
718            select! {
719                recv(plugin_manager_commands_rx) -> msg => {
720                    match msg {
721                        Ok(event) => {
722                            match event {
723                                #[cfg(not(feature = "subgraph"))]
724                                PluginManagerCommand::LoadConfig(_, _, _) => {
725                                    continue;
726                                }
727                                #[cfg(feature = "subgraph")]
728                                PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
729                                    if let Err(e) = load_subgraph_plugin(uuid, config, notifier, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
730                                        let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load plugin: {}", e)));
731                                    }
732                                }
733                                #[cfg(not(feature = "subgraph"))]
734                                PluginManagerCommand::UnloadPlugin(_, _) => {
735                                    continue;
736                                }
737                                #[cfg(feature = "subgraph")]
738                                PluginManagerCommand::UnloadPlugin(uuid, notifier) => {
739                                    match  unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
740                                        Ok(_)=>{
741                                            log_info(format!("Successfully unloaded plugin with UUID {}", uuid));
742                                            let _ = notifier.send(Ok(()));
743                                        }
744                                        Err(e)=>{
745                                            log_error(format!("Failed to unload plugin {}: {}", uuid, e));
746                                            let _ = notifier.send(Err(e));
747                                        }
748                                    }
749                                }
750                                #[cfg(not(feature = "subgraph"))]
751                                PluginManagerCommand::ReloadPlugin(_, _, _) => {
752                                    continue;
753                                }
754                                #[cfg(feature = "subgraph")]
755                                PluginManagerCommand::ReloadPlugin(uuid, config, notifier) => {
756                                    // Unload the old plugin
757                                    match  unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
758                                        Ok(_)=>{
759                                            log_info(format!("Unloaded plugin with UUID {} for reload", uuid));
760
761                                            // Load the new plugin with the same UUID
762                                            match load_subgraph_plugin(uuid, config, notifier.clone(), &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
763                                                Ok(_)=>{
764                                                    log_info(format!("Successfully reloaded plugin with UUID {}", uuid));
765                                                    let _ = notifier.send(format!("Plugin {} reloaded successfully", uuid));
766                                                }
767                                                Err(e)=>{
768                                                    let error_msg = format!("Failed to reload plugin {}: {}", uuid, e);
769                                                    log_error(error_msg.clone());
770                                                    let _ = notifier.send(error_msg);
771                                                }
772                                            }
773                                        }
774                                        Err(e)=>{
775                                            let error_msg = format!("Failed to unload plugin {} during reload: {}", uuid, e);
776                                            log_error(error_msg.clone());
777                                            let _ = notifier.send(error_msg);
778                                        }
779                                    }
780                                }
781                                PluginManagerCommand::ListPlugins(notifier) => {
782                                    let plugin_list: Vec<crate::PluginInfo> = plugin_map.iter().map(|(uuid, (_, plugin_name))| {
783                                        crate::PluginInfo {
784                                            plugin_name: plugin_name.clone(),
785                                            uuid: uuid.to_string(),
786                                        }
787                                    }).collect();
788                                    let _ = notifier.send(plugin_list);
789                                }
790                            }
791                        },
792                        Err(e) => {
793                            break format!("Failed to read plugin manager command: {:?}", e);
794                        },
795                    }
796                },
797                recv(geyser_events_rx) -> msg => match msg {
798                    Err(e) => {
799                        break format!("Failed to read new transaction to send to Geyser plugin: {e}");
800                    },
801                    Ok(GeyserEvent::NotifyTransaction(transaction_with_status_meta, versioned_transaction)) => {
802
803                        if !indexing_enabled {
804                            continue;
805                        }
806
807                        let transaction = match versioned_transaction {
808                            Some(tx) => tx,
809                            None => {
810                                log_warn("Unable to index sanitized transaction".to_string());
811                                continue;
812                            }
813                        };
814
815                        let transaction_replica = ReplicaTransactionInfoV3 {
816                            signature: &transaction.signatures[0],
817                            is_vote: false,
818                            transaction: &transaction,
819                            transaction_status_meta: &transaction_with_status_meta.meta,
820                            index: 0,
821                            message_hash: &transaction.message.hash(),
822                        };
823
824                        for plugin in surfpool_plugin_manager.iter() {
825                            if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_3(&transaction_replica), transaction_with_status_meta.slot) {
826                                log_error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e))
827                            };
828                        }
829
830                        #[cfg(feature = "geyser_plugin")]
831                        for plugin in plugin_manager.plugins.iter() {
832                            if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_3(&transaction_replica), transaction_with_status_meta.slot) {
833                                log_error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e))
834                            };
835                        }
836                    }
837                    Ok(GeyserEvent::UpdateAccount(account_update)) => {
838                        let GeyserAccountUpdate {
839                            pubkey,
840                            account,
841                            slot,
842                            sanitized_transaction,
843                            write_version,
844                        } = account_update;
845
846                        let account_replica = ReplicaAccountInfoV3 {
847                            pubkey: pubkey.as_ref(),
848                            lamports: account.lamports,
849                            owner: account.owner.as_ref(),
850                            executable: account.executable,
851                            rent_epoch: account.rent_epoch,
852                            data: account.data.as_ref(),
853                            write_version,
854                            txn: sanitized_transaction.as_ref(),
855                        };
856
857                        for plugin in surfpool_plugin_manager.iter() {
858                            if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, false) {
859                                log_error(format!("Failed to update account in Geyser plugin: {:?}", e));
860                            }
861                        }
862
863                        #[cfg(feature = "geyser_plugin")]
864                        for plugin in plugin_manager.plugins.iter() {
865                            if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, false) {
866                                log_error(format!("Failed to update account in Geyser plugin: {:?}", e))
867                            }
868                        }
869                    }
870                    Ok(GeyserEvent::StartupAccountUpdate(account_update)) => {
871                        let GeyserAccountUpdate {
872                            pubkey,
873                            account,
874                            slot,
875                            sanitized_transaction,
876                            write_version,
877                        } = account_update;
878
879                        let account_replica = ReplicaAccountInfoV3 {
880                            pubkey: pubkey.as_ref(),
881                            lamports: account.lamports,
882                            owner: account.owner.as_ref(),
883                            executable: account.executable,
884                            rent_epoch: account.rent_epoch,
885                            data: account.data.as_ref(),
886                            write_version,
887                            txn: sanitized_transaction.as_ref(),
888                        };
889
890                        // Send startup account updates with is_startup=true
891                        for plugin in surfpool_plugin_manager.iter() {
892                            if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, true) {
893                                log_error(format!("Failed to send startup account update to Geyser plugin: {:?}", e));
894                            }
895                        }
896
897                        #[cfg(feature = "geyser_plugin")]
898                        for plugin in plugin_manager.plugins.iter() {
899                            if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, true) {
900                                log_error(format!("Failed to send startup account update to Geyser plugin: {:?}", e))
901                            }
902                        }
903                    }
904                    Ok(GeyserEvent::EndOfStartup) => {
905                        for plugin in surfpool_plugin_manager.iter() {
906                            if let Err(e) = plugin.notify_end_of_startup() {
907                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e)));
908                            }
909                        }
910
911                        #[cfg(feature = "geyser_plugin")]
912                        for plugin in plugin_manager.plugins.iter() {
913                            if let Err(e) = plugin.notify_end_of_startup() {
914                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e)));
915                            }
916                        }
917                    }
918                    Ok(GeyserEvent::UpdateSlotStatus { slot, parent, status }) => {
919                        let slot_status = match status {
920                            crate::surfnet::GeyserSlotStatus::Processed => SlotStatus::Processed,
921                            crate::surfnet::GeyserSlotStatus::Confirmed => SlotStatus::Confirmed,
922                            crate::surfnet::GeyserSlotStatus::Rooted => SlotStatus::Rooted,
923                        };
924
925                        for plugin in surfpool_plugin_manager.iter() {
926                            if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) {
927                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e)));
928                            }
929                        }
930
931                        #[cfg(feature = "geyser_plugin")]
932                        for plugin in plugin_manager.plugins.iter() {
933                            if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) {
934                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e)));
935                            }
936                        }
937                    }
938                    Ok(GeyserEvent::NotifyBlockMetadata(block_metadata)) => {
939                        let rewards = RewardsAndNumPartitions {
940                            rewards: vec![],
941                            num_partitions: None,
942                        };
943
944                        let block_info = ReplicaBlockInfoV4 {
945                            slot: block_metadata.slot,
946                            blockhash: &block_metadata.blockhash,
947                            rewards: &rewards,
948                            block_time: block_metadata.block_time,
949                            block_height: block_metadata.block_height,
950                            parent_slot: block_metadata.parent_slot,
951                            parent_blockhash: &block_metadata.parent_blockhash,
952                            executed_transaction_count: block_metadata.executed_transaction_count,
953                            entry_count: block_metadata.entry_count,
954                        };
955
956                        for plugin in surfpool_plugin_manager.iter() {
957                            if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) {
958                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e)));
959                            }
960                        }
961
962                        #[cfg(feature = "geyser_plugin")]
963                        for plugin in plugin_manager.plugins.iter() {
964                            if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) {
965                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e)));
966                            }
967                        }
968                    }
969                    Ok(GeyserEvent::NotifyEntry(entry_info)) => {
970                        let entry_replica = ReplicaEntryInfoV2 {
971                            slot: entry_info.slot,
972                            index: entry_info.index,
973                            num_hashes: entry_info.num_hashes,
974                            hash: &entry_info.hash,
975                            executed_transaction_count: entry_info.executed_transaction_count,
976                            starting_transaction_index: entry_info.starting_transaction_index,
977                        };
978
979                        for plugin in surfpool_plugin_manager.iter() {
980                            if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) {
981                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e)));
982                            }
983                        }
984
985                        #[cfg(feature = "geyser_plugin")]
986                        for plugin in plugin_manager.plugins.iter() {
987                            if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) {
988                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e)));
989                            }
990                        }
991                    }
992                }
993            }
994        };
995        Err(err)
996    }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
997    Ok(handle)
998}
999
1000async fn start_rpc_servers_runloop(
1001    config: &SurfpoolConfig,
1002    simnet_commands_tx: &Sender<SimnetCommand>,
1003    svm_locker: SurfnetSvmLocker,
1004    remote_rpc_client: &Option<SurfnetRemoteClient>,
1005) -> Result<
1006    (
1007        Receiver<PluginManagerCommand>,
1008        JoinHandle<()>,
1009        JoinHandle<()>,
1010    ),
1011    String,
1012> {
1013    let rpc_addr: SocketAddr = config
1014        .rpc
1015        .get_rpc_base_url()
1016        .parse()
1017        .map_err(|e: std::net::AddrParseError| e.to_string())?;
1018    let ws_addr: SocketAddr = config
1019        .rpc
1020        .get_ws_base_url()
1021        .parse()
1022        .map_err(|e: std::net::AddrParseError| e.to_string())?;
1023
1024    check_port_availability(rpc_addr, "RPC")?;
1025    check_port_availability(ws_addr, "WebSocket")?;
1026
1027    let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
1028    let simnet_events_tx = svm_locker.simnet_events_tx();
1029
1030    let middleware = SurfpoolMiddleware::new(
1031        svm_locker,
1032        simnet_commands_tx,
1033        &plugin_manager_commands_tx,
1034        &config.rpc,
1035        remote_rpc_client,
1036    );
1037
1038    let rpc_handle =
1039        start_http_rpc_server_runloop(config, middleware.clone(), simnet_events_tx.clone()).await?;
1040    let ws_handle = start_ws_rpc_server_runloop(config, middleware, simnet_events_tx).await?;
1041    Ok((plugin_manager_commands_rx, rpc_handle, ws_handle))
1042}
1043
1044async fn start_http_rpc_server_runloop(
1045    config: &SurfpoolConfig,
1046    middleware: SurfpoolMiddleware,
1047    simnet_events_tx: Sender<SimnetEvent>,
1048) -> Result<JoinHandle<()>, String> {
1049    let server_bind: SocketAddr = config
1050        .rpc
1051        .get_rpc_base_url()
1052        .parse::<SocketAddr>()
1053        .map_err(|e| e.to_string())?;
1054
1055    let mut io = MetaIoHandler::with_middleware(middleware);
1056    io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
1057    io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
1058    io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
1059    io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
1060    io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
1061    io.extend_with(rpc::surfnet_cheatcodes::SurfnetCheatcodesRpc.to_delegate());
1062    io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
1063
1064    if !config.plugin_config_path.is_empty() {
1065        io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
1066    }
1067
1068    let _handle = hiro_system_kit::thread_named("RPC Handler")
1069        .spawn(move || {
1070            let server = match ServerBuilder::new(io)
1071                .cors(DomainsValidation::Disabled)
1072                .threads(6)
1073                .max_request_body_size(15 * 1024 * 1024)
1074                .start_http(&server_bind)
1075            {
1076                Ok(server) => server,
1077                Err(e) => {
1078                    let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
1079                        "Failed to start RPC server: {:?}",
1080                        e
1081                    )));
1082                    return;
1083                }
1084            };
1085
1086            server.wait();
1087            let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
1088        })
1089        .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
1090
1091    Ok(_handle)
1092}
1093async fn start_ws_rpc_server_runloop(
1094    config: &SurfpoolConfig,
1095    middleware: SurfpoolMiddleware,
1096    simnet_events_tx: Sender<SimnetEvent>,
1097) -> Result<JoinHandle<()>, String> {
1098    let ws_server_bind: SocketAddr = config
1099        .rpc
1100        .get_ws_base_url()
1101        .parse::<SocketAddr>()
1102        .map_err(|e| e.to_string())?;
1103
1104    let uid = std::sync::atomic::AtomicUsize::new(0);
1105    let ws_middleware = SurfpoolWebsocketMiddleware::new(middleware.clone(), None);
1106
1107    let mut rpc_io = PubSubHandler::new(MetaIoHandler::with_middleware(ws_middleware));
1108
1109    let _ws_handle = hiro_system_kit::thread_named("WebSocket RPC Handler")
1110        .spawn(move || {
1111            // The pubsub handler needs to be able to run async tasks, so we create a Tokio runtime here
1112            let runtime = tokio::runtime::Builder::new_multi_thread()
1113                .enable_all()
1114                .build()
1115                .expect("Failed to build Tokio runtime");
1116
1117            let tokio_handle = runtime.handle();
1118            rpc_io.extend_with(
1119                rpc::ws::SurfpoolWsRpc {
1120                    uid,
1121                    signature_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1122                    account_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1123                    program_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1124                    slot_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1125                    logs_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1126                    snapshot_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1127                    tokio_handle: tokio_handle.clone(),
1128                }
1129                .to_delegate(),
1130            );
1131            runtime.block_on(async move {
1132                let server = match WsServerBuilder::new(rpc_io)
1133                    .session_meta_extractor(move |ctx: &RequestContext| {
1134                        // Create meta from context + session
1135                        let runloop_context = RunloopContext {
1136                            id: None,
1137                            svm_locker: middleware.surfnet_svm.clone(),
1138                            simnet_commands_tx: middleware.simnet_commands_tx.clone(),
1139                            plugin_manager_commands_tx: middleware
1140                                .plugin_manager_commands_tx
1141                                .clone(),
1142                            remote_rpc_client: middleware.remote_rpc_client.clone(),
1143                            rpc_config: middleware.config.clone(),
1144                        };
1145                        Some(SurfpoolWebsocketMeta::new(
1146                            runloop_context,
1147                            Some(Arc::new(Session::new(ctx.sender()))),
1148                        ))
1149                    })
1150                    .start(&ws_server_bind)
1151                {
1152                    Ok(server) => server,
1153                    Err(e) => {
1154                        let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
1155                            "Failed to start WebSocket RPC server: {:?}",
1156                            e
1157                        )));
1158                        return;
1159                    }
1160                };
1161                // The server itself is blocking, so spawn it in a separate thread if needed
1162                tokio::task::spawn_blocking(move || {
1163                    server.wait().unwrap();
1164                })
1165                .await
1166                .ok();
1167
1168                let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
1169            });
1170        })
1171        .map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?;
1172    Ok(_ws_handle)
1173}