surfpool_core/runloops/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    net::SocketAddr,
4    sync::Arc,
5    thread::{sleep, JoinHandle},
6    time::{Duration, Instant},
7};
8
9use agave_geyser_plugin_interface::geyser_plugin_interface::{
10    GeyserPlugin, ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
11};
12use crossbeam::select;
13use crossbeam_channel::{unbounded, Receiver, Sender};
14use ipc_channel::{
15    ipc::{IpcOneShotServer, IpcReceiver},
16    router::RouterProxy,
17};
18use jsonrpc_core::MetaIoHandler;
19use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
20use jsonrpc_pubsub::{PubSubHandler, Session};
21use jsonrpc_ws_server::{RequestContext, ServerBuilder as WsServerBuilder};
22use solana_commitment_config::CommitmentConfig;
23use solana_message::{v0::LoadedAddresses, SimpleAddressLoader};
24use solana_sdk::transaction::MessageHash;
25use solana_transaction::sanitized::SanitizedTransaction;
26use solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta};
27use surfpool_subgraph::SurfpoolSubgraphPlugin;
28use surfpool_types::{
29    BlockProductionMode, ClockCommand, ClockEvent, SchemaDataSourcingEvent, SimnetCommand,
30    SimnetEvent, SubgraphCommand, SubgraphPluginConfig, SurfpoolConfig,
31};
32
33use crate::{
34    rpc::{
35        self, accounts_data::AccountsData, accounts_scan::AccountsScan, admin::AdminRpc,
36        bank_data::BankData, full::Full, minimal::Minimal, surfnet_cheatcodes::SvmTricksRpc,
37        ws::Rpc, RunloopContext, SurfpoolMiddleware, SurfpoolWebsocketMeta,
38        SurfpoolWebsocketMiddleware,
39    },
40    surfnet::{
41        locker::SurfnetSvmLocker,
42        remote::{SomeRemoteCtx, SurfnetRemoteClient},
43        GeyserEvent,
44    },
45    PluginManagerCommand,
46};
47
48const BLOCKHASH_SLOT_TTL: u64 = 75;
49
50pub async fn start_local_surfnet_runloop(
51    svm_locker: SurfnetSvmLocker,
52    config: SurfpoolConfig,
53    subgraph_commands_tx: Sender<SubgraphCommand>,
54    simnet_commands_tx: Sender<SimnetCommand>,
55    simnet_commands_rx: Receiver<SimnetCommand>,
56    geyser_events_rx: Receiver<GeyserEvent>,
57) -> Result<(), Box<dyn std::error::Error>> {
58    let Some(simnet) = config.simnets.first() else {
59        return Ok(());
60    };
61    let block_production_mode = simnet.block_production_mode.clone();
62
63    let remote_rpc_client = Some(SurfnetRemoteClient::new(&simnet.remote_rpc_url));
64
65    let _ = svm_locker.initialize(&remote_rpc_client).await?;
66
67    svm_locker.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
68    let simnet_events_tx_cc = svm_locker.simnet_events_tx();
69
70    let (plugin_manager_commands_rx, _rpc_handle, _ws_handle) = start_rpc_servers_runloop(
71        &config,
72        &simnet_commands_tx,
73        svm_locker.clone(),
74        &remote_rpc_client,
75    )
76    .await?;
77
78    let simnet_config = simnet.clone();
79
80    if !config.plugin_config_path.is_empty() {
81        match start_geyser_runloop(
82            plugin_manager_commands_rx,
83            subgraph_commands_tx.clone(),
84            simnet_events_tx_cc.clone(),
85            geyser_events_rx,
86        ) {
87            Ok(_) => {}
88            Err(e) => {
89                let _ = simnet_events_tx_cc
90                    .send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
91            }
92        };
93    }
94
95    let (clock_event_rx, clock_command_tx) = start_clock_runloop(simnet_config.slot_time);
96
97    let _ = simnet_events_tx_cc.send(SimnetEvent::Ready);
98
99    start_block_production_runloop(
100        clock_event_rx,
101        clock_command_tx,
102        simnet_commands_rx,
103        svm_locker,
104        block_production_mode,
105        &remote_rpc_client,
106    )
107    .await
108}
109
110pub async fn start_block_production_runloop(
111    clock_event_rx: Receiver<ClockEvent>,
112    clock_command_tx: Sender<ClockCommand>,
113    simnet_commands_rx: Receiver<SimnetCommand>,
114    svm_locker: SurfnetSvmLocker,
115    mut block_production_mode: BlockProductionMode,
116    remote_rpc_client: &Option<SurfnetRemoteClient>,
117) -> Result<(), Box<dyn std::error::Error>> {
118    loop {
119        let mut do_produce_block = false;
120
121        select! {
122            recv(clock_event_rx) -> msg => if let Ok(event) = msg {
123                match event {
124                    ClockEvent::Tick => {
125                        if block_production_mode.eq(&BlockProductionMode::Clock) {
126                            do_produce_block = true;
127                        }
128                    }
129                    ClockEvent::ExpireBlockHash => {
130                        do_produce_block = true;
131                    }
132                }
133            },
134            recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
135                match event {
136                    SimnetCommand::SlotForward(_key) => {
137                        block_production_mode = BlockProductionMode::Manual;
138                        do_produce_block = true;
139                    }
140                    SimnetCommand::SlotBackward(_key) => {
141
142                    }
143                    SimnetCommand::UpdateClock(update) => {
144                        let _ = clock_command_tx.send(update);
145                        continue
146                    }
147                    SimnetCommand::UpdateBlockProductionMode(update) => {
148                        block_production_mode = update;
149                        continue
150                    }
151                    SimnetCommand::TransactionReceived(_key, transaction, status_tx, skip_preflight) => {
152                        svm_locker.process_transaction(&remote_rpc_client.get_remote_ctx(CommitmentConfig::confirmed()), transaction, status_tx, skip_preflight).await?;
153                    }
154                    SimnetCommand::Terminate(_) => {
155                        std::process::exit(0)
156                    }
157                }
158            },
159        }
160
161        {
162            if do_produce_block {
163                svm_locker.confirm_current_block()?;
164            }
165        }
166    }
167}
168
169pub fn start_clock_runloop(mut slot_time: u64) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
170    let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
171    let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
172
173    let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
174        let mut enabled = true;
175        let mut block_hash_timeout = Instant::now();
176
177        loop {
178            match clock_command_rx.try_recv() {
179                Ok(ClockCommand::Pause) => {
180                    enabled = false;
181                }
182                Ok(ClockCommand::Resume) => {
183                    enabled = true;
184                }
185                Ok(ClockCommand::Toggle) => {
186                    enabled = !enabled;
187                }
188                Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
189                    slot_time = updated_slot_time;
190                }
191                Err(_e) => {}
192            }
193            sleep(Duration::from_millis(slot_time));
194            if enabled {
195                let _ = clock_event_tx.send(ClockEvent::Tick);
196                // Todo: the block expiration is not completely accurate.
197                if block_hash_timeout.elapsed()
198                    > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
199                {
200                    let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
201                    block_hash_timeout = Instant::now();
202                }
203            }
204        }
205    });
206
207    (clock_event_rx, clock_command_tx)
208}
209
210fn start_geyser_runloop(
211    plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
212    subgraph_commands_tx: Sender<SubgraphCommand>,
213    simnet_events_tx: Sender<SimnetEvent>,
214    geyser_events_rx: Receiver<GeyserEvent>,
215) -> Result<JoinHandle<Result<(), String>>, String> {
216    let handle = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
217        let mut plugin_manager = vec![];
218
219        let ipc_router = RouterProxy::new();
220        // Note:
221        // At the moment, surfpool-subgraph is the only plugin that we're mounting.
222        // Please open an issue http://github.com/txtx/surfpool/issues/new if this is a feature you need!
223        //
224        // Proof of concept:
225        //
226        // let geyser_plugin_config_file = PathBuf::from("../../surfpool_subgraph_plugin.json");
227        // let contents = "{\"name\": \"surfpool-subgraph\", \"libpath\": \"target/release/libsurfpool_subgraph.dylib\"}";
228        // let result: serde_json::Value = json5::from_str(&contents).unwrap();
229        // let libpath = result["libpath"]
230        //     .as_str()
231        //     .unwrap();
232        // let mut libpath = PathBuf::from(libpath);
233        // if libpath.is_relative() {
234        //     let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
235        //         GeyserPluginManagerError::CannotOpenConfigFile(format!(
236        //             "Failed to resolve parent of {geyser_plugin_config_file:?}",
237        //         ))
238        //     }).unwrap();
239        //     libpath = config_dir.join(libpath);
240        // }
241        // let plugin_name = result["name"].as_str().map(|s| s.to_owned()).unwrap_or(format!("surfpool-subgraph"));
242        // let (plugin, lib) = unsafe {
243        //     let lib = match Library::new(&surfpool_subgraph_path) {
244        //         Ok(lib) => lib,
245        //         Err(e) => {
246        //             let _ = simnet_events_tx_copy.send(SimnetEvent::ErrorLog(Local::now(), format!("Unable to load plugin {}: {}", plugin_name, e.to_string())));
247        //             continue;
248        //         }
249        //     };
250        //     let constructor: Symbol<PluginConstructor> = lib
251        //         .get(b"_create_plugin")
252        //         .map_err(|e| format!("{}", e.to_string()))?;
253        //     let plugin_raw = constructor();
254        //     (Box::from_raw(plugin_raw), lib)
255        // };
256
257        let err = loop {
258            select! {
259                recv(plugin_manager_commands_rx) -> msg => {
260                    match msg {
261                        Ok(event) => {
262                            match event {
263                                PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
264                                    let _ = subgraph_commands_tx.send(SubgraphCommand::CreateSubgraph(uuid, config.data.clone(), notifier));
265                                    let mut plugin = SurfpoolSubgraphPlugin::default();
266
267                                    let (server, ipc_token) = IpcOneShotServer::<IpcReceiver<SchemaDataSourcingEvent>>::new().expect("Failed to create IPC one-shot server.");
268                                    let subgraph_plugin_config = SubgraphPluginConfig {
269                                        uuid,
270                                        ipc_token,
271                                        subgraph_request: config.data.clone()
272                                    };
273
274                                    let config_file = match serde_json::to_string(&subgraph_plugin_config) {
275                                        Ok(c) => c,
276                                        Err(e) => {
277                                            let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to serialize subgraph plugin config: {:?}", e)));
278                                            continue;
279                                        }
280                                    };
281
282                                    if let Err(e) = plugin.on_load(&config_file, false) {
283                                        let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load Geyser plugin: {:?}", e)));
284                                    };
285                                    if let Ok((_, rx)) = server.accept() {
286                                        let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver::<SchemaDataSourcingEvent>(rx);
287                                        let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveSubgraph(subgraph_rx));
288                                    };
289                                    let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
290                                    plugin_manager.push(plugin);
291                                    let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
292                                }
293                            }
294                        },
295                        Err(e) => {
296                            break format!("Failed to read plugin manager command: {:?}", e);
297                        },
298                    }
299                },
300                recv(geyser_events_rx) -> msg => match msg {
301                    Err(e) => {
302                        break format!("Failed to read new transaction to send to Geyser plugin: {e}");
303                    },
304                    Ok(GeyserEvent::NewTransaction(transaction, transaction_metadata, slot)) => {
305                        let mut inner_instructions = vec![];
306                        for (i,inner) in transaction_metadata.inner_instructions.iter().enumerate() {
307                            inner_instructions.push(
308                                InnerInstructions {
309                                    index: i as u8,
310                                    instructions: inner.iter().map(|i| InnerInstruction {
311                                        instruction: i.instruction.clone(),
312                                        stack_height: Some(i.stack_height as u32)
313                                    }).collect()
314                                }
315                            )
316                        }
317
318                        let transaction_status_meta = TransactionStatusMeta {
319                            status: Ok(()),
320                            fee: 0,
321                            pre_balances: vec![],
322                            post_balances: vec![],
323                            inner_instructions: Some(inner_instructions),
324                            log_messages: Some(transaction_metadata.logs.clone()),
325                            pre_token_balances: None,
326                            post_token_balances: None,
327                            rewards: None,
328                            loaded_addresses: LoadedAddresses {
329                                writable: vec![],
330                                readonly: vec![],
331                            },
332                            return_data: Some(transaction_metadata.return_data.clone()),
333                            compute_units_consumed: Some(transaction_metadata.compute_units_consumed),
334                        };
335
336                        let transaction = match SanitizedTransaction::try_create(transaction, MessageHash::Compute, None, SimpleAddressLoader::Disabled, &HashSet::new()) {
337                        Ok(tx) => tx,
338                            Err(e) => {
339                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: failed to serialize transaction: {:?}", e)));
340                                continue;
341                            }
342                        };
343
344                        let transaction_replica = ReplicaTransactionInfoV2 {
345                            signature: &transaction_metadata.signature,
346                            is_vote: false,
347                            transaction: &transaction,
348                            transaction_status_meta: &transaction_status_meta,
349                            index: 0
350                        };
351                        for plugin in plugin_manager.iter() {
352                            if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), slot) {
353                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
354                            };
355                        }
356                    }
357                }
358            }
359        };
360        Err(err)
361    }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
362    Ok(handle)
363}
364
365async fn start_rpc_servers_runloop(
366    config: &SurfpoolConfig,
367    simnet_commands_tx: &Sender<SimnetCommand>,
368    svm_locker: SurfnetSvmLocker,
369    remote_rpc_client: &Option<SurfnetRemoteClient>,
370) -> Result<
371    (
372        Receiver<PluginManagerCommand>,
373        JoinHandle<()>,
374        JoinHandle<()>,
375    ),
376    String,
377> {
378    let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
379    let simnet_events_tx = svm_locker.simnet_events_tx();
380
381    let middleware = SurfpoolMiddleware::new(
382        svm_locker,
383        simnet_commands_tx,
384        &plugin_manager_commands_tx,
385        &config.rpc,
386        remote_rpc_client,
387    );
388
389    let rpc_handle =
390        start_http_rpc_server_runloop(config, middleware.clone(), simnet_events_tx.clone()).await?;
391    let ws_handle = start_ws_rpc_server_runloop(config, middleware, simnet_events_tx).await?;
392    Ok((plugin_manager_commands_rx, rpc_handle, ws_handle))
393}
394
395async fn start_http_rpc_server_runloop(
396    config: &SurfpoolConfig,
397    middleware: SurfpoolMiddleware,
398    simnet_events_tx: Sender<SimnetEvent>,
399) -> Result<JoinHandle<()>, String> {
400    let server_bind: SocketAddr = config
401        .rpc
402        .get_socket_address()
403        .parse::<SocketAddr>()
404        .map_err(|e| e.to_string())?;
405
406    let mut io = MetaIoHandler::with_middleware(middleware.clone());
407    io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
408    io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
409    io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
410    io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
411    io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
412    io.extend_with(rpc::surfnet_cheatcodes::SurfnetCheatcodesRpc.to_delegate());
413    io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
414
415    if !config.plugin_config_path.is_empty() {
416        io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
417    }
418
419    let _ = std::net::TcpListener::bind(server_bind)
420        .map_err(|e| format!("Failed to start RPC server: {}", e))?;
421
422    let _handle = hiro_system_kit::thread_named("RPC Handler")
423        .spawn(move || {
424            let server = match ServerBuilder::new(io)
425                .cors(DomainsValidation::Disabled)
426                .start_http(&server_bind)
427            {
428                Ok(server) => server,
429                Err(e) => {
430                    let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
431                        "Failed to start RPC server: {:?}",
432                        e
433                    )));
434                    return;
435                }
436            };
437
438            server.wait();
439            let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
440        })
441        .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
442
443    Ok(_handle)
444}
445async fn start_ws_rpc_server_runloop(
446    config: &SurfpoolConfig,
447    middleware: SurfpoolMiddleware,
448    simnet_events_tx: Sender<SimnetEvent>,
449) -> Result<JoinHandle<()>, String> {
450    let ws_server_bind: SocketAddr = config
451        .rpc
452        .get_ws_address()
453        .parse::<SocketAddr>()
454        .map_err(|e| e.to_string())?;
455
456    let uid = std::sync::atomic::AtomicUsize::new(0);
457    let subscription_map = Arc::new(std::sync::RwLock::new(HashMap::new()));
458    let ws_middleware = SurfpoolWebsocketMiddleware::new(middleware.clone(), None);
459
460    let mut rpc_io = PubSubHandler::new(MetaIoHandler::with_middleware(ws_middleware));
461
462    let _ws_handle = hiro_system_kit::thread_named("WebSocket RPC Handler")
463        .spawn(move || {
464            // The pubsub handler needs to be able to run async tasks, so we create a Tokio runtime here
465            let runtime = tokio::runtime::Builder::new_multi_thread()
466                .enable_all()
467                .build()
468                .expect("Failed to build Tokio runtime");
469
470            let tokio_handle = runtime.handle();
471            rpc_io.extend_with(
472                rpc::ws::SurfpoolWsRpc {
473                    uid,
474                    active: subscription_map,
475                    tokio_handle: tokio_handle.clone(),
476                }
477                .to_delegate(),
478            );
479            runtime.block_on(async move {
480                let server = match WsServerBuilder::new(rpc_io)
481                    .session_meta_extractor(move |ctx: &RequestContext| {
482                        // Create meta from context + session
483                        let runloop_context = RunloopContext {
484                            id: None,
485                            svm_locker: middleware.surfnet_svm.clone(),
486                            simnet_commands_tx: middleware.simnet_commands_tx.clone(),
487                            plugin_manager_commands_tx: middleware
488                                .plugin_manager_commands_tx
489                                .clone(),
490                            remote_rpc_client: middleware.remote_rpc_client.clone(),
491                        };
492                        Some(SurfpoolWebsocketMeta::new(
493                            runloop_context,
494                            Some(Arc::new(Session::new(ctx.sender()))),
495                        ))
496                    })
497                    .start(&ws_server_bind)
498                {
499                    Ok(server) => server,
500                    Err(e) => {
501                        let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
502                            "Failed to start WebSocket RPC server: {:?}",
503                            e
504                        )));
505                        return;
506                    }
507                };
508                // The server itself is blocking, so spawn it in a separate thread if needed
509                tokio::task::spawn_blocking(move || {
510                    server.wait().unwrap();
511                })
512                .await
513                .ok();
514
515                let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
516            });
517        })
518        .map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?;
519    Ok(_ws_handle)
520}