surfpool_core/runloops/
mod.rs

1use solana_message::{v0::LoadedAddresses, SimpleAddressLoader};
2use solana_sdk::transaction::MessageHash;
3use solana_transaction::sanitized::SanitizedTransaction;
4use solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta};
5use std::{
6    collections::HashSet,
7    net::SocketAddr,
8    sync::Arc,
9    thread::{sleep, JoinHandle},
10    time::{Duration, Instant},
11};
12use surfpool_subgraph::SurfpoolSubgraphPlugin;
13use tokio::sync::RwLock;
14
15use crate::{
16    rpc::{
17        self, accounts_data::AccountsData, accounts_scan::AccountsScan, admin::AdminRpc,
18        bank_data::BankData, full::Full, minimal::Minimal, svm_tricks::SvmTricksRpc,
19        SurfpoolMiddleware,
20    },
21    surfnet::{GeyserEvent, SurfnetSvm},
22    PluginManagerCommand,
23};
24use agave_geyser_plugin_interface::geyser_plugin_interface::{
25    GeyserPlugin, ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
26};
27use crossbeam::select;
28use crossbeam_channel::{unbounded, Receiver, Sender};
29use ipc_channel::{
30    ipc::{IpcOneShotServer, IpcReceiver},
31    router::RouterProxy,
32};
33use jsonrpc_core::MetaIoHandler;
34use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
35use surfpool_types::{
36    BlockProductionMode, ClockCommand, ClockEvent, SchemaDataSourcingEvent, SubgraphPluginConfig,
37};
38use surfpool_types::{SimnetCommand, SimnetEvent, SubgraphCommand, SurfpoolConfig};
39
40const BLOCKHASH_SLOT_TTL: u64 = 75;
41
42pub async fn start_local_surfnet_runloop(
43    mut surfnet_svm: SurfnetSvm,
44    config: SurfpoolConfig,
45    subgraph_commands_tx: Sender<SubgraphCommand>,
46    simnet_commands_tx: Sender<SimnetCommand>,
47    simnet_commands_rx: Receiver<SimnetCommand>,
48    geyser_events_rx: Receiver<GeyserEvent>,
49) -> Result<(), Box<dyn std::error::Error>> {
50    let Some(simnet) = config.simnets.first() else {
51        return Ok(());
52    };
53    let block_production_mode = simnet.block_production_mode.clone();
54    surfnet_svm.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
55    let _ = surfnet_svm.connect(&simnet.remote_rpc_url).await?;
56
57    // Question: can the value `slots_in_epoch` fluctuate over time?
58
59    let simnet_events_tx_cc = surfnet_svm.simnet_events_tx.clone();
60    let svm_locker = Arc::new(RwLock::new(surfnet_svm));
61    let (plugin_manager_commands_rx, _rpc_handle) =
62        start_rpc_server_runloop(&config, &simnet_commands_tx, svm_locker.clone()).await?;
63
64    let simnet_config = simnet.clone();
65
66    if !config.plugin_config_path.is_empty() {
67        match start_geyser_runloop(
68            plugin_manager_commands_rx,
69            subgraph_commands_tx.clone(),
70            simnet_events_tx_cc.clone(),
71            geyser_events_rx,
72        ) {
73            Ok(_) => {}
74            Err(e) => {
75                let _ = simnet_events_tx_cc
76                    .send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
77            }
78        };
79    }
80
81    let (clock_event_rx, clock_command_tx) = start_clock_runloop(simnet_config.slot_time);
82
83    start_block_production_runloop(
84        clock_event_rx,
85        clock_command_tx,
86        simnet_commands_rx,
87        svm_locker,
88        block_production_mode,
89    )
90    .await
91}
92
93pub async fn start_block_production_runloop(
94    clock_event_rx: Receiver<ClockEvent>,
95    clock_command_tx: Sender<ClockCommand>,
96    simnet_commands_rx: Receiver<SimnetCommand>,
97    svm_locker: Arc<RwLock<SurfnetSvm>>,
98    mut block_production_mode: BlockProductionMode,
99) -> Result<(), Box<dyn std::error::Error>> {
100    loop {
101        let mut do_produce_block = false;
102
103        select! {
104            recv(clock_event_rx) -> msg => if let Ok(event) = msg {
105                match event {
106                    ClockEvent::Tick => {
107                        if block_production_mode.eq(&BlockProductionMode::Clock) {
108                            do_produce_block = true;
109                        }
110                    }
111                    ClockEvent::ExpireBlockHash => {
112                        do_produce_block = true;
113                    }
114                }
115            },
116            recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
117                match event {
118                    SimnetCommand::SlotForward(_key) => {
119                        block_production_mode = BlockProductionMode::Manual;
120                        do_produce_block = true;
121                    }
122                    SimnetCommand::SlotBackward(_key) => {
123
124                    }
125                    SimnetCommand::UpdateClock(update) => {
126                        let _ = clock_command_tx.send(update);
127                        continue
128                    }
129                    SimnetCommand::UpdateBlockProductionMode(update) => {
130                        block_production_mode = update;
131                        continue
132                    }
133                    SimnetCommand::TransactionReceived(_key, transaction, status_tx, skip_preflight) => {
134                        let mut svm_writer = svm_locker.write().await;
135                        svm_writer.process_transaction(transaction, status_tx ,skip_preflight).await?;
136                    }
137                    SimnetCommand::Terminate(_) => {
138                        std::process::exit(0)
139                    }
140                }
141            },
142        }
143
144        {
145            if do_produce_block {
146                let mut svm_writer = svm_locker.write().await;
147                svm_writer.confirm_transactions()?;
148                svm_writer.finalize_transactions()?;
149                svm_writer.new_blockhash();
150            }
151        }
152    }
153}
154
155pub fn start_clock_runloop(mut slot_time: u64) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
156    let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
157    let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
158
159    let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
160        let mut enabled = true;
161        let mut block_hash_timeout = Instant::now();
162
163        loop {
164            match clock_command_rx.try_recv() {
165                Ok(ClockCommand::Pause) => {
166                    enabled = false;
167                }
168                Ok(ClockCommand::Resume) => {
169                    enabled = true;
170                }
171                Ok(ClockCommand::Toggle) => {
172                    enabled = !enabled;
173                }
174                Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
175                    slot_time = updated_slot_time;
176                }
177                Err(_e) => {}
178            }
179            sleep(Duration::from_millis(slot_time));
180            if enabled {
181                let _ = clock_event_tx.send(ClockEvent::Tick);
182                // Todo: the block expiration is not completely accurate.
183                if block_hash_timeout.elapsed()
184                    > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
185                {
186                    let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
187                    block_hash_timeout = Instant::now();
188                }
189            }
190        }
191    });
192
193    (clock_event_rx, clock_command_tx)
194}
195
196fn start_geyser_runloop(
197    plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
198    subgraph_commands_tx: Sender<SubgraphCommand>,
199    simnet_events_tx: Sender<SimnetEvent>,
200    geyser_events_rx: Receiver<GeyserEvent>,
201) -> Result<JoinHandle<Result<(), String>>, String> {
202    let handle = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
203        let mut plugin_manager = vec![];
204
205        let ipc_router = RouterProxy::new();
206        // Note:
207        // At the moment, surfpool-subgraph is the only plugin that we're mounting.
208        // Please open an issue http://github.com/txtx/surfpool/issues/new if this is a feature you need!
209        //
210        // Proof of concept:
211        //
212        // let geyser_plugin_config_file = PathBuf::from("../../surfpool_subgraph_plugin.json");
213        // let contents = "{\"name\": \"surfpool-subgraph\", \"libpath\": \"target/release/libsurfpool_subgraph.dylib\"}";
214        // let result: serde_json::Value = json5::from_str(&contents).unwrap();
215        // let libpath = result["libpath"]
216        //     .as_str()
217        //     .unwrap();
218        // let mut libpath = PathBuf::from(libpath);
219        // if libpath.is_relative() {
220        //     let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
221        //         GeyserPluginManagerError::CannotOpenConfigFile(format!(
222        //             "Failed to resolve parent of {geyser_plugin_config_file:?}",
223        //         ))
224        //     }).unwrap();
225        //     libpath = config_dir.join(libpath);
226        // }
227        // let plugin_name = result["name"].as_str().map(|s| s.to_owned()).unwrap_or(format!("surfpool-subgraph"));
228        // let (plugin, lib) = unsafe {
229        //     let lib = match Library::new(&surfpool_subgraph_path) {
230        //         Ok(lib) => lib,
231        //         Err(e) => {
232        //             let _ = simnet_events_tx_copy.send(SimnetEvent::ErrorLog(Local::now(), format!("Unable to load plugin {}: {}", plugin_name, e.to_string())));
233        //             continue;
234        //         }
235        //     };
236        //     let constructor: Symbol<PluginConstructor> = lib
237        //         .get(b"_create_plugin")
238        //         .map_err(|e| format!("{}", e.to_string()))?;
239        //     let plugin_raw = constructor();
240        //     (Box::from_raw(plugin_raw), lib)
241        // };
242
243        let err = loop {
244            select! {
245                recv(plugin_manager_commands_rx) -> msg => {
246                    match msg {
247                        Ok(event) => {
248                            match event {
249                                PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
250                                    let _ = subgraph_commands_tx.send(SubgraphCommand::CreateSubgraph(uuid, config.data.clone(), notifier));
251                                    let mut plugin = SurfpoolSubgraphPlugin::default();
252
253                                    let (server, ipc_token) = IpcOneShotServer::<IpcReceiver<SchemaDataSourcingEvent>>::new().expect("Failed to create IPC one-shot server.");
254                                    let subgraph_plugin_config = SubgraphPluginConfig {
255                                        uuid,
256                                        ipc_token,
257                                        subgraph_request: config.data.clone()
258                                    };
259
260                                    let config_file = match serde_json::to_string(&subgraph_plugin_config) {
261                                        Ok(c) => c,
262                                        Err(e) => {
263                                            let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to serialize subgraph plugin config: {:?}", e)));
264                                            continue;
265                                        }
266                                    };
267
268                                    if let Err(e) = plugin.on_load(&config_file, false) {
269                                        let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load Geyser plugin: {:?}", e)));
270                                    };
271                                    if let Ok((_, rx)) = server.accept() {
272                                        let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver::<SchemaDataSourcingEvent>(rx);
273                                        let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveSubgraph(subgraph_rx));
274                                    };
275                                    let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
276                                    plugin_manager.push(plugin);
277                                    let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
278                                }
279                            }
280                        },
281                        Err(e) => {
282                            break format!("Failed to read plugin manager command: {:?}", e);
283                        },
284                    }
285                },
286                recv(geyser_events_rx) -> msg => match msg {
287                    Err(e) => {
288                        break format!("Failed to read new transaction to send to Geyser plugin: {e}");
289                    },
290                    Ok(GeyserEvent::NewTransaction(transaction, transaction_metadata, slot)) => {
291                        let mut inner_instructions = vec![];
292                        for (i,inner) in transaction_metadata.inner_instructions.iter().enumerate() {
293                            inner_instructions.push(
294                                InnerInstructions {
295                                    index: i as u8,
296                                    instructions: inner.iter().map(|i| InnerInstruction {
297                                        instruction: i.instruction.clone(),
298                                        stack_height: Some(i.stack_height as u32)
299                                    }).collect()
300                                }
301                            )
302                        }
303
304                        let transaction_status_meta = TransactionStatusMeta {
305                            status: Ok(()),
306                            fee: 0,
307                            pre_balances: vec![],
308                            post_balances: vec![],
309                            inner_instructions: Some(inner_instructions),
310                            log_messages: Some(transaction_metadata.logs.clone()),
311                            pre_token_balances: None,
312                            post_token_balances: None,
313                            rewards: None,
314                            loaded_addresses: LoadedAddresses {
315                                writable: vec![],
316                                readonly: vec![],
317                            },
318                            return_data: Some(transaction_metadata.return_data.clone()),
319                            compute_units_consumed: Some(transaction_metadata.compute_units_consumed),
320                        };
321
322                        let transaction = match SanitizedTransaction::try_create(transaction, MessageHash::Compute, None, SimpleAddressLoader::Disabled, &HashSet::new()) {
323                        Ok(tx) => tx,
324                            Err(e) => {
325                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: failed to serialize transaction: {:?}", e)));
326                                continue;
327                            }
328                        };
329
330                        let transaction_replica = ReplicaTransactionInfoV2 {
331                            signature: &transaction_metadata.signature,
332                            is_vote: false,
333                            transaction: &transaction,
334                            transaction_status_meta: &transaction_status_meta,
335                            index: 0
336                        };
337                        for plugin in plugin_manager.iter() {
338                            if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), slot) {
339                                let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
340                            };
341                        }
342                    }
343                }
344            }
345        };
346        Err(err)
347    }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
348    Ok(handle)
349}
350
351async fn start_rpc_server_runloop(
352    config: &SurfpoolConfig,
353    simnet_commands_tx: &Sender<SimnetCommand>,
354    svm_locker: Arc<RwLock<SurfnetSvm>>,
355) -> Result<(Receiver<PluginManagerCommand>, JoinHandle<()>), String> {
356    let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
357    let simnet_events_tx = svm_locker.read().await.simnet_events_tx.clone();
358
359    let middleware = SurfpoolMiddleware::new(
360        svm_locker,
361        &simnet_commands_tx,
362        &plugin_manager_commands_tx,
363        &config.rpc,
364    );
365    let server_bind: SocketAddr = config
366        .rpc
367        .get_socket_address()
368        .parse::<SocketAddr>()
369        .map_err(|e| e.to_string())?;
370
371    let mut io = MetaIoHandler::with_middleware(middleware);
372    io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
373    io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
374    io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
375    io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
376    io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
377    io.extend_with(rpc::svm_tricks::SurfpoolSvmTricksRpc.to_delegate());
378    io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
379
380    if !config.plugin_config_path.is_empty() {
381        io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
382    }
383
384    let _ = std::net::TcpListener::bind(server_bind)
385        .map_err(|e| format!("Failed to start RPC server: {}", e))?;
386
387    let _handle = hiro_system_kit::thread_named("RPC Handler")
388        .spawn(move || {
389            let server = match ServerBuilder::new(io)
390                .cors(DomainsValidation::Disabled)
391                .start_http(&server_bind)
392            {
393                Ok(server) => server,
394                Err(e) => {
395                    let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
396                        "Failed to start RPC server: {:?}",
397                        e
398                    )));
399                    return;
400                }
401            };
402
403            server.wait();
404            let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
405        })
406        .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
407    Ok((plugin_manager_commands_rx, _handle))
408}