surfpool_core/runloops/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    net::SocketAddr,
4    sync::{Arc, RwLock},
5    thread::{sleep, JoinHandle},
6    time::{Duration, Instant},
7};
8
9use agave_geyser_plugin_interface::geyser_plugin_interface::{
10    GeyserPlugin, ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
11};
12use chrono::Utc;
13use crossbeam::select;
14use crossbeam_channel::{unbounded, Receiver, Sender};
15use ipc_channel::{
16    ipc::{IpcOneShotServer, IpcReceiver},
17    router::RouterProxy,
18};
19use jsonrpc_core::MetaIoHandler;
20use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
21use jsonrpc_pubsub::{PubSubHandler, Session};
22use jsonrpc_ws_server::{RequestContext, ServerBuilder as WsServerBuilder};
23use solana_message::{v0::LoadedAddresses, SimpleAddressLoader};
24use solana_sdk::transaction::MessageHash;
25use solana_transaction::sanitized::SanitizedTransaction;
26use solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta};
27use surfpool_subgraph::SurfpoolSubgraphPlugin;
28use surfpool_types::{
29    BlockProductionMode, ClockCommand, ClockEvent, SchemaDataSourcingEvent, SimnetCommand,
30    SimnetEvent, SubgraphCommand, SubgraphPluginConfig, SurfpoolConfig,
31};
32
33use crate::{
34    rpc::{
35        self, accounts_data::AccountsData, accounts_scan::AccountsScan, admin::AdminRpc,
36        bank_data::BankData, full::Full, minimal::Minimal, surfnet_cheatcodes::SvmTricksRpc,
37        ws::Rpc, RunloopContext, SurfpoolMiddleware, SurfpoolWebsocketMeta,
38        SurfpoolWebsocketMiddleware,
39    },
40    surfnet::{locker::SurfnetSvmLocker, remote::SurfnetRemoteClient, GeyserEvent},
41    PluginManagerCommand,
42};
43
44const BLOCKHASH_SLOT_TTL: u64 = 75;
45
46pub async fn start_local_surfnet_runloop(
47    svm_locker: SurfnetSvmLocker,
48    config: SurfpoolConfig,
49    subgraph_commands_tx: Sender<SubgraphCommand>,
50    simnet_commands_tx: Sender<SimnetCommand>,
51    simnet_commands_rx: Receiver<SimnetCommand>,
52    geyser_events_rx: Receiver<GeyserEvent>,
53) -> Result<(), Box<dyn std::error::Error>> {
54    let Some(simnet) = config.simnets.first() else {
55        return Ok(());
56    };
57    let block_production_mode = simnet.block_production_mode.clone();
58
59    let remote_rpc_client = Some(SurfnetRemoteClient::new(&simnet.remote_rpc_url));
60
61    let _ = svm_locker.initialize(&remote_rpc_client).await?;
62
63    svm_locker.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
64    let simnet_events_tx_cc = svm_locker.simnet_events_tx();
65
66    let (plugin_manager_commands_rx, _rpc_handle, _ws_handle) = start_rpc_servers_runloop(
67        &config,
68        &simnet_commands_tx,
69        svm_locker.clone(),
70        &remote_rpc_client,
71    )
72    .await?;
73
74    let simnet_config = simnet.clone();
75
76    if !config.plugin_config_path.is_empty() {
77        match start_geyser_runloop(
78            plugin_manager_commands_rx,
79            subgraph_commands_tx.clone(),
80            simnet_events_tx_cc.clone(),
81            geyser_events_rx,
82        ) {
83            Ok(_) => {}
84            Err(e) => {
85                let _ = simnet_events_tx_cc
86                    .send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
87            }
88        };
89    }
90
91    let (clock_event_rx, clock_command_tx) = start_clock_runloop(simnet_config.slot_time);
92
93    let _ = simnet_events_tx_cc.send(SimnetEvent::Ready);
94
95    start_block_production_runloop(
96        clock_event_rx,
97        clock_command_tx,
98        simnet_commands_rx,
99        simnet_commands_tx.clone(),
100        svm_locker,
101        block_production_mode,
102        &remote_rpc_client,
103        simnet_config.expiry.map(|e| e * 1000),
104    )
105    .await
106}
107
108pub async fn start_block_production_runloop(
109    clock_event_rx: Receiver<ClockEvent>,
110    clock_command_tx: Sender<ClockCommand>,
111    simnet_commands_rx: Receiver<SimnetCommand>,
112    simnet_commands_tx: Sender<SimnetCommand>,
113    svm_locker: SurfnetSvmLocker,
114    mut block_production_mode: BlockProductionMode,
115    remote_rpc_client: &Option<SurfnetRemoteClient>,
116    expiry_duration_ms: Option<u64>,
117) -> Result<(), Box<dyn std::error::Error>> {
118    let mut next_scheduled_expiry_check: Option<u64> =
119        expiry_duration_ms.map(|expiry_val| Utc::now().timestamp_millis() as u64 + expiry_val);
120    loop {
121        let mut do_produce_block = false;
122
123        select! {
124            recv(clock_event_rx) -> msg => if let Ok(event) = msg {
125                match event {
126                    ClockEvent::Tick => {
127                        if block_production_mode.eq(&BlockProductionMode::Clock) {
128                            do_produce_block = true;
129                        }
130
131                        if let Some(expiry_ms) = expiry_duration_ms {
132                            if let Some(scheduled_time_ref) = &mut next_scheduled_expiry_check {
133                                let now_ms = Utc::now().timestamp_millis() as u64;
134                                if now_ms >= *scheduled_time_ref {
135                                    let svm = svm_locker.0.read().await;
136                                    if svm.updated_at + expiry_ms < now_ms {
137                                        let _ = simnet_commands_tx.send(SimnetCommand::Terminate(None));
138                                    } else {
139                                        *scheduled_time_ref = svm.updated_at + expiry_ms;
140                                    }
141                                }
142                            }
143                        }
144                    }
145                    ClockEvent::ExpireBlockHash => {
146                        do_produce_block = true;
147                    }
148                }
149            },
150            recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
151                match event {
152                    SimnetCommand::SlotForward(_key) => {
153                        block_production_mode = BlockProductionMode::Manual;
154                        do_produce_block = true;
155                    }
156                    SimnetCommand::SlotBackward(_key) => {
157
158                    }
159                    SimnetCommand::UpdateClock(update) => {
160                        let _ = clock_command_tx.send(update);
161                        continue
162                    }
163                    SimnetCommand::UpdateBlockProductionMode(update) => {
164                        block_production_mode = update;
165                        continue
166                    }
167                    SimnetCommand::TransactionReceived(_key, transaction, status_tx, skip_preflight) => {
168                        svm_locker.process_transaction(remote_rpc_client, transaction, status_tx, skip_preflight).await?;
169                    }
170                    SimnetCommand::Terminate(_) => {
171                        let _ = svm_locker.simnet_events_tx().send(SimnetEvent::Aborted("Terminated due to inactivity.".to_string()));
172                        break;
173                    }
174                }
175            },
176        }
177
178        {
179            if do_produce_block {
180                svm_locker.confirm_current_block()?;
181            }
182        }
183    }
184    Ok(())
185}
186
187pub fn start_clock_runloop(mut slot_time: u64) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
188    let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
189    let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
190
191    let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
192        let mut enabled = true;
193        let mut block_hash_timeout = Instant::now();
194
195        loop {
196            match clock_command_rx.try_recv() {
197                Ok(ClockCommand::Pause) => {
198                    enabled = false;
199                }
200                Ok(ClockCommand::Resume) => {
201                    enabled = true;
202                }
203                Ok(ClockCommand::Toggle) => {
204                    enabled = !enabled;
205                }
206                Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
207                    slot_time = updated_slot_time;
208                }
209                Err(_e) => {}
210            }
211            sleep(Duration::from_millis(slot_time));
212            if enabled {
213                let _ = clock_event_tx.send(ClockEvent::Tick);
214                // Todo: the block expiration is not completely accurate.
215                if block_hash_timeout.elapsed()
216                    > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
217                {
218                    let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
219                    block_hash_timeout = Instant::now();
220                }
221            }
222        }
223    });
224
225    (clock_event_rx, clock_command_tx)
226}
227
228fn start_geyser_runloop(
229    plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
230    subgraph_commands_tx: Sender<SubgraphCommand>,
231    simnet_events_tx: Sender<SimnetEvent>,
232    geyser_events_rx: Receiver<GeyserEvent>,
233) -> Result<JoinHandle<Result<(), String>>, String> {
234    let handle = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
235        let mut plugin_manager = vec![];
236
237        let ipc_router = RouterProxy::new();
238        // Note:
239        // At the moment, surfpool-subgraph is the only plugin that we're mounting.
240        // Please open an issue http://github.com/txtx/surfpool/issues/new if this is a feature you need!
241        //
242        // Proof of concept:
243        //
244        // let geyser_plugin_config_file = PathBuf::from("../../surfpool_subgraph_plugin.json");
245        // let contents = "{\"name\": \"surfpool-subgraph\", \"libpath\": \"target/release/libsurfpool_subgraph.dylib\"}";
246        // let result: serde_json::Value = json5::from_str(&contents).unwrap();
247        // let libpath = result["libpath"]
248        //     .as_str()
249        //     .unwrap();
250        // let mut libpath = PathBuf::from(libpath);
251        // if libpath.is_relative() {
252        //     let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
253        //         GeyserPluginManagerError::CannotOpenConfigFile(format!(
254        //             "Failed to resolve parent of {geyser_plugin_config_file:?}",
255        //         ))
256        //     }).unwrap();
257        //     libpath = config_dir.join(libpath);
258        // }
259        // let plugin_name = result["name"].as_str().map(|s| s.to_owned()).unwrap_or(format!("surfpool-subgraph"));
260        // let (plugin, lib) = unsafe {
261        //     let lib = match Library::new(&surfpool_subgraph_path) {
262        //         Ok(lib) => lib,
263        //         Err(e) => {
264        //             let _ = simnet_events_tx_copy.send(SimnetEvent::ErrorLog(Local::now(), format!("Unable to load plugin {}: {}", plugin_name, e.to_string())));
265        //             continue;
266        //         }
267        //     };
268        //     let constructor: Symbol<PluginConstructor> = lib
269        //         .get(b"_create_plugin")
270        //         .map_err(|e| format!("{}", e.to_string()))?;
271        //     let plugin_raw = constructor();
272        //     (Box::from_raw(plugin_raw), lib)
273        // };
274
275        let err = loop {
276            select! {
277                recv(plugin_manager_commands_rx) -> msg => {
278                    match msg {
279                        Ok(event) => {
280                            match event {
281                                PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
282                                    let _ = subgraph_commands_tx.send(SubgraphCommand::CreateSubgraph(uuid, config.data.clone(), notifier));
283                                    let mut plugin = SurfpoolSubgraphPlugin::default();
284
285                                    let (server, ipc_token) = IpcOneShotServer::<IpcReceiver<SchemaDataSourcingEvent>>::new().expect("Failed to create IPC one-shot server.");
286                                    let subgraph_plugin_config = SubgraphPluginConfig {
287                                        uuid,
288                                        ipc_token,
289                                        subgraph_request: config.data.clone()
290                                    };
291
292                                    let config_file = match serde_json::to_string(&subgraph_plugin_config) {
293                                        Ok(c) => c,
294                                        Err(e) => {
295                                            let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to serialize subgraph plugin config: {:?}", e)));
296                                            continue;
297                                        }
298                                    };
299
300                                    if let Err(e) = plugin.on_load(&config_file, false) {
301                                        let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load Geyser plugin: {:?}", e)));
302                                    };
303                                    if let Ok((_, rx)) = server.accept() {
304                                        let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver::<SchemaDataSourcingEvent>(rx);
305                                        let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveSubgraph(subgraph_rx));
306                                    };
307                                    let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
308                                    plugin_manager.push(plugin);
309                                    let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
310                                }
311                            }
312                        },
313                        Err(e) => {
314                            break format!("Failed to read plugin manager command: {:?}", e);
315                        },
316                    }
317                },
318                recv(geyser_events_rx) -> msg => match msg {
319                    Err(e) => {
320                        break format!("Failed to read new transaction to send to Geyser plugin: {e}");
321                    },
322                    Ok(GeyserEvent::NewTransaction(transaction, transaction_metadata, slot)) => {
323                        let mut inner_instructions = vec![];
324                        for (i,inner) in transaction_metadata.inner_instructions.iter().enumerate() {
325                            inner_instructions.push(
326                                InnerInstructions {
327                                    index: i as u8,
328                                    instructions: inner.iter().map(|i| InnerInstruction {
329                                        instruction: i.instruction.clone(),
330                                        stack_height: Some(i.stack_height as u32)
331                                    }).collect()
332                                }
333                            )
334                        }
335
336                        let transaction_status_meta = TransactionStatusMeta {
337                            status: Ok(()),
338                            fee: 0,
339                            pre_balances: vec![],
340                            post_balances: vec![],
341                            inner_instructions: Some(inner_instructions),
342                            log_messages: Some(transaction_metadata.logs.clone()),
343                            pre_token_balances: None,
344                            post_token_balances: None,
345                            rewards: None,
346                            loaded_addresses: LoadedAddresses {
347                                writable: vec![],
348                                readonly: vec![],
349                            },
350                            return_data: Some(transaction_metadata.return_data.clone()),
351                            compute_units_consumed: Some(transaction_metadata.compute_units_consumed),
352                        };
353
354                        let transaction = match SanitizedTransaction::try_create(transaction, MessageHash::Compute, None, SimpleAddressLoader::Disabled, &HashSet::new()) {
355                        Ok(tx) => tx,
356                            Err(e) => {
357                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: failed to serialize transaction: {:?}", e)));
358                                continue;
359                            }
360                        };
361
362                        let transaction_replica = ReplicaTransactionInfoV2 {
363                            signature: &transaction_metadata.signature,
364                            is_vote: false,
365                            transaction: &transaction,
366                            transaction_status_meta: &transaction_status_meta,
367                            index: 0
368                        };
369                        for plugin in plugin_manager.iter() {
370                            if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), slot) {
371                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
372                            };
373                        }
374                    }
375                }
376            }
377        };
378        Err(err)
379    }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
380    Ok(handle)
381}
382
383async fn start_rpc_servers_runloop(
384    config: &SurfpoolConfig,
385    simnet_commands_tx: &Sender<SimnetCommand>,
386    svm_locker: SurfnetSvmLocker,
387    remote_rpc_client: &Option<SurfnetRemoteClient>,
388) -> Result<
389    (
390        Receiver<PluginManagerCommand>,
391        JoinHandle<()>,
392        JoinHandle<()>,
393    ),
394    String,
395> {
396    let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
397    let simnet_events_tx = svm_locker.simnet_events_tx();
398
399    let middleware = SurfpoolMiddleware::new(
400        svm_locker,
401        simnet_commands_tx,
402        &plugin_manager_commands_tx,
403        &config.rpc,
404        remote_rpc_client,
405    );
406
407    let rpc_handle =
408        start_http_rpc_server_runloop(config, middleware.clone(), simnet_events_tx.clone()).await?;
409    let ws_handle = start_ws_rpc_server_runloop(config, middleware, simnet_events_tx).await?;
410    Ok((plugin_manager_commands_rx, rpc_handle, ws_handle))
411}
412
413async fn start_http_rpc_server_runloop(
414    config: &SurfpoolConfig,
415    middleware: SurfpoolMiddleware,
416    simnet_events_tx: Sender<SimnetEvent>,
417) -> Result<JoinHandle<()>, String> {
418    let server_bind: SocketAddr = config
419        .rpc
420        .get_socket_address()
421        .parse::<SocketAddr>()
422        .map_err(|e| e.to_string())?;
423
424    let mut io = MetaIoHandler::with_middleware(middleware.clone());
425    io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
426    io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
427    io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
428    io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
429    io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
430    io.extend_with(rpc::surfnet_cheatcodes::SurfnetCheatcodesRpc.to_delegate());
431    io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
432
433    if !config.plugin_config_path.is_empty() {
434        io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
435    }
436
437    let _ = std::net::TcpListener::bind(server_bind)
438        .map_err(|e| format!("Failed to start RPC server: {}", e))?;
439
440    let _handle = hiro_system_kit::thread_named("RPC Handler")
441        .spawn(move || {
442            let server = match ServerBuilder::new(io)
443                .cors(DomainsValidation::Disabled)
444                .start_http(&server_bind)
445            {
446                Ok(server) => server,
447                Err(e) => {
448                    let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
449                        "Failed to start RPC server: {:?}",
450                        e
451                    )));
452                    return;
453                }
454            };
455
456            server.wait();
457            let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
458        })
459        .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
460
461    Ok(_handle)
462}
463async fn start_ws_rpc_server_runloop(
464    config: &SurfpoolConfig,
465    middleware: SurfpoolMiddleware,
466    simnet_events_tx: Sender<SimnetEvent>,
467) -> Result<JoinHandle<()>, String> {
468    let ws_server_bind: SocketAddr = config
469        .rpc
470        .get_ws_address()
471        .parse::<SocketAddr>()
472        .map_err(|e| e.to_string())?;
473
474    let uid = std::sync::atomic::AtomicUsize::new(0);
475    let ws_middleware = SurfpoolWebsocketMiddleware::new(middleware.clone(), None);
476
477    let mut rpc_io = PubSubHandler::new(MetaIoHandler::with_middleware(ws_middleware));
478
479    let _ws_handle = hiro_system_kit::thread_named("WebSocket RPC Handler")
480        .spawn(move || {
481            // The pubsub handler needs to be able to run async tasks, so we create a Tokio runtime here
482            let runtime = tokio::runtime::Builder::new_multi_thread()
483                .enable_all()
484                .build()
485                .expect("Failed to build Tokio runtime");
486
487            let tokio_handle = runtime.handle();
488            rpc_io.extend_with(
489                rpc::ws::SurfpoolWsRpc {
490                    uid,
491                    signature_subscription_map: Arc::new(RwLock::new(HashMap::new())),
492                    account_subscription_map: Arc::new(RwLock::new(HashMap::new())),
493                    slot_subscription_map: Arc::new(RwLock::new(HashMap::new())),
494                    tokio_handle: tokio_handle.clone(),
495                }
496                .to_delegate(),
497            );
498            runtime.block_on(async move {
499                let server = match WsServerBuilder::new(rpc_io)
500                    .session_meta_extractor(move |ctx: &RequestContext| {
501                        // Create meta from context + session
502                        let runloop_context = RunloopContext {
503                            id: None,
504                            svm_locker: middleware.surfnet_svm.clone(),
505                            simnet_commands_tx: middleware.simnet_commands_tx.clone(),
506                            plugin_manager_commands_tx: middleware
507                                .plugin_manager_commands_tx
508                                .clone(),
509                            remote_rpc_client: middleware.remote_rpc_client.clone(),
510                        };
511                        Some(SurfpoolWebsocketMeta::new(
512                            runloop_context,
513                            Some(Arc::new(Session::new(ctx.sender()))),
514                        ))
515                    })
516                    .start(&ws_server_bind)
517                {
518                    Ok(server) => server,
519                    Err(e) => {
520                        let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
521                            "Failed to start WebSocket RPC server: {:?}",
522                            e
523                        )));
524                        return;
525                    }
526                };
527                // The server itself is blocking, so spawn it in a separate thread if needed
528                tokio::task::spawn_blocking(move || {
529                    server.wait().unwrap();
530                })
531                .await
532                .ok();
533
534                let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
535            });
536        })
537        .map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?;
538    Ok(_ws_handle)
539}