surfpool_core/simnet/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use chrono::{DateTime, Local, Utc};
use crossbeam_channel::Sender;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use litesvm::LiteSVM;
use solana_rpc_client::rpc_client::RpcClient;
use solana_sdk::{
    clock::Clock, epoch_info::EpochInfo, pubkey::Pubkey, transaction::VersionedTransaction,
};
use std::{
    net::SocketAddr,
    sync::{mpsc::channel, Arc, RwLock},
    thread::sleep,
    time::Duration,
};
use tokio::sync::broadcast;

use crate::{
    rpc::{
        self, accounts_data::AccountsData, bank_data::BankData, full::Full, minimal::Minimal,
        SurfpoolMiddleware,
    },
    types::SurfpoolConfig,
};

pub struct GlobalState {
    pub svm: LiteSVM,
    pub transactions_processed: u64,
    pub epoch_info: EpochInfo,
}

#[derive(Debug)]
pub enum SimnetEvent {
    Ready,
    Aborted(String),
    Shutdown,
    ClockUpdate(Clock),
    EpochInfoUpdate(EpochInfo),
    BlockHashExpired,
    InfoLog(DateTime<Local>, String),
    ErrorLog(DateTime<Local>, String),
    WarnLog(DateTime<Local>, String),
    DebugLog(DateTime<Local>, String),
    TransactionReceived(DateTime<Local>, VersionedTransaction),
    AccountUpdate(DateTime<Local>, Pubkey),
}

pub async fn start(
    config: &SurfpoolConfig,
    simnet_events_tx: Sender<SimnetEvent>,
) -> Result<(), Box<dyn std::error::Error>> {
    let svm = LiteSVM::new();

    // Todo: should check config first
    let rpc_client = RpcClient::new(&config.simnet.remote_rpc_url);
    let epoch_info = rpc_client.get_epoch_info()?;
    // Question: can the value `slots_in_epoch` fluctuate over time?
    let slots_in_epoch = epoch_info.slots_in_epoch;

    let context = GlobalState {
        svm,
        transactions_processed: 0,
        epoch_info: epoch_info.clone(),
    };

    let context = Arc::new(RwLock::new(context));
    let (mempool_tx, mut mempool_rx) = broadcast::channel(1024);
    let middleware = SurfpoolMiddleware {
        context: context.clone(),
        mempool_tx,
        config: config.rpc.clone(),
    };

    let simnet_events_tx_copy = simnet_events_tx.clone();

    let server_bind: SocketAddr =
        format!("{}:{}", config.rpc.bind_address, config.rpc.bind_port).parse()?;

    let mut io = MetaIoHandler::with_middleware(middleware);
    io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
    io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
    io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
    io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());

    let server = ServerBuilder::new(io)
        .cors(DomainsValidation::Disabled)
        .start_http(&server_bind)?;

    let _ = simnet_events_tx_copy.send(SimnetEvent::Ready);
    let _handle = hiro_system_kit::thread_named("rpc handler").spawn(move || {
        server.wait();
        let _ = simnet_events_tx_copy.send(SimnetEvent::Shutdown);
    });

    let _ = simnet_events_tx.send(SimnetEvent::EpochInfoUpdate(epoch_info.clone()));

    loop {
        sleep(Duration::from_millis(config.simnet.slot_time));
        let unix_timestamp: i64 = Utc::now().timestamp();

        let Ok(mut ctx) = context.try_write() else {
            println!("unable to lock svm");
            continue;
        };

        while let Ok(tx) = mempool_rx.try_recv() {
            let _ =
                simnet_events_tx.send(SimnetEvent::TransactionReceived(Local::now(), tx.clone()));

            tx.verify_with_results();
            let tx = tx.into_legacy_transaction().unwrap();
            let message = &tx.message;
            // println!("Processing Transaction {:?}", tx);
            for instruction in &message.instructions {
                // The Transaction may not be sanitized at this point
                if instruction.program_id_index as usize >= message.account_keys.len() {
                    unreachable!();
                }
                let program_id = &message.account_keys[instruction.program_id_index as usize];
                if ctx.svm.get_account(&program_id).is_none() {
                    // println!("Retrieving account from Mainnet: {:?}", program_id);
                    let res = rpc_client.get_account(&program_id);
                    let event = match res {
                        Ok(account) => {
                            let _ = ctx.svm.set_account(*program_id, account);
                            SimnetEvent::AccountUpdate(Local::now(), program_id.clone())
                        }
                        Err(e) => SimnetEvent::ErrorLog(
                            Local::now(),
                            format!("unable to retrieve account: {}", e),
                        ),
                    };
                    let _ = simnet_events_tx.send(event);
                }
            }
            let res = ctx.svm.send_transaction(tx);
        }
        ctx.epoch_info.slot_index += 1;
        ctx.epoch_info.absolute_slot += 1;
        if ctx.epoch_info.slot_index > slots_in_epoch {
            ctx.epoch_info.slot_index = 0;
            ctx.epoch_info.epoch += 1;
        }
        // ctx.svm.expire_blockhash();
        let clock: Clock = Clock {
            slot: ctx.epoch_info.slot_index,
            epoch: ctx.epoch_info.epoch,
            unix_timestamp,
            epoch_start_timestamp: 0, // todo
            leader_schedule_epoch: 0, // todo
        };
        let _ = simnet_events_tx.send(SimnetEvent::ClockUpdate(clock.clone()));
        ctx.svm.set_sysvar(&clock);
    }
}