surfpool_core/rpc/
mod.rs

1use std::{future::Future, sync::Arc};
2
3use blake3::Hash;
4use crossbeam_channel::Sender;
5use jsonrpc_core::{
6    futures::{future::Either, FutureExt},
7    middleware, BoxFuture, Error, FutureResponse, Metadata, Middleware, Request, Response,
8};
9use jsonrpc_pubsub::{PubSubMetadata, Session};
10use solana_clock::Slot;
11use surfpool_types::{types::RpcConfig, SimnetCommand};
12
13use crate::{
14    error::{SurfpoolError, SurfpoolResult},
15    surfnet::{
16        locker::SurfnetSvmLocker,
17        remote::{SomeRemoteCtx, SurfnetRemoteClient},
18        svm::SurfnetSvm,
19    },
20    PluginManagerCommand,
21};
22
23pub mod accounts_data;
24pub mod accounts_scan;
25pub mod admin;
26pub mod bank_data;
27pub mod full;
28pub mod minimal;
29pub mod surfnet_cheatcodes;
30pub mod utils;
31pub mod ws;
32
33#[derive(PartialEq, Eq, Clone, Copy, Debug)]
34pub enum RpcHealthStatus {
35    Ok,
36    Behind { num_slots: Slot },
37    Unknown,
38}
39
40pub struct SurfpoolRpc;
41
42#[derive(Clone)]
43pub struct RunloopContext {
44    pub id: Option<Hash>,
45    pub svm_locker: SurfnetSvmLocker,
46    pub simnet_commands_tx: Sender<SimnetCommand>,
47    pub plugin_manager_commands_tx: Sender<PluginManagerCommand>,
48    pub remote_rpc_client: Option<SurfnetRemoteClient>,
49}
50
51pub struct SurfnetRpcContext<T> {
52    pub svm_locker: SurfnetSvmLocker,
53    pub remote_ctx: Option<(SurfnetRemoteClient, T)>,
54}
55
56trait State {
57    fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker>;
58    fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
59    where
60        F: Fn(&SurfnetSvm) -> T + Send + Sync,
61        T: Send + 'static;
62    fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>>;
63}
64
65impl State for Option<RunloopContext> {
66    fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker> {
67        // Retrieve svm state
68        let Some(ctx) = self else {
69            return Err(SurfpoolError::no_locker());
70        };
71        Ok(ctx.svm_locker.clone())
72    }
73
74    fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
75    where
76        F: Fn(&SurfnetSvm) -> T + Send + Sync,
77        T: Send + 'static,
78    {
79        let Some(ctx) = self else {
80            return Err(SurfpoolError::no_locker());
81        };
82        Ok(ctx.svm_locker.with_svm_reader(reader))
83    }
84
85    fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>> {
86        let Some(ctx) = self else {
87            return Err(SurfpoolError::no_locker());
88        };
89
90        Ok(SurfnetRpcContext {
91            svm_locker: ctx.svm_locker.clone(),
92            remote_ctx: ctx.remote_rpc_client.get_remote_ctx(input),
93        })
94    }
95}
96
97impl Metadata for RunloopContext {}
98
99#[derive(Clone)]
100pub struct SurfpoolMiddleware {
101    pub surfnet_svm: SurfnetSvmLocker,
102    pub simnet_commands_tx: Sender<SimnetCommand>,
103    pub plugin_manager_commands_tx: Sender<PluginManagerCommand>,
104    pub config: RpcConfig,
105    pub remote_rpc_client: Option<SurfnetRemoteClient>,
106}
107
108impl SurfpoolMiddleware {
109    pub fn new(
110        surfnet_svm: SurfnetSvmLocker,
111        simnet_commands_tx: &Sender<SimnetCommand>,
112        plugin_manager_commands_tx: &Sender<PluginManagerCommand>,
113        config: &RpcConfig,
114        remote_rpc_client: &Option<SurfnetRemoteClient>,
115    ) -> Self {
116        Self {
117            surfnet_svm,
118            simnet_commands_tx: simnet_commands_tx.clone(),
119            plugin_manager_commands_tx: plugin_manager_commands_tx.clone(),
120            config: config.clone(),
121            remote_rpc_client: remote_rpc_client.clone(),
122        }
123    }
124}
125
126impl Middleware<Option<RunloopContext>> for SurfpoolMiddleware {
127    type Future = FutureResponse;
128    type CallFuture = middleware::NoopCallFuture;
129
130    fn on_request<F, X>(
131        &self,
132        request: Request,
133        _meta: Option<RunloopContext>,
134        next: F,
135    ) -> Either<Self::Future, X>
136    where
137        F: FnOnce(Request, Option<RunloopContext>) -> X + Send,
138        X: Future<Output = Option<Response>> + Send + 'static,
139    {
140        let meta = Some(RunloopContext {
141            id: None,
142            svm_locker: self.surfnet_svm.clone(),
143            simnet_commands_tx: self.simnet_commands_tx.clone(),
144            plugin_manager_commands_tx: self.plugin_manager_commands_tx.clone(),
145            remote_rpc_client: self.remote_rpc_client.clone(),
146        });
147        Either::Left(Box::pin(next(request, meta).map(move |res| res)))
148    }
149}
150
151#[derive(Clone)]
152pub struct SurfpoolWebsocketMiddleware {
153    pub surfpool_middleware: SurfpoolMiddleware,
154    pub session: Option<Arc<Session>>,
155}
156
157impl SurfpoolWebsocketMiddleware {
158    pub fn new(surfpool_middleware: SurfpoolMiddleware, session: Option<Arc<Session>>) -> Self {
159        Self {
160            surfpool_middleware,
161            session,
162        }
163    }
164}
165
166impl Middleware<Option<SurfpoolWebsocketMeta>> for SurfpoolWebsocketMiddleware {
167    type Future = FutureResponse;
168    type CallFuture = middleware::NoopCallFuture;
169
170    fn on_request<F, X>(
171        &self,
172        request: Request,
173        meta: Option<SurfpoolWebsocketMeta>,
174        next: F,
175    ) -> Either<Self::Future, X>
176    where
177        F: FnOnce(Request, Option<SurfpoolWebsocketMeta>) -> X + Send,
178        X: Future<Output = Option<Response>> + Send + 'static,
179    {
180        let runloop_context = RunloopContext {
181            id: None,
182            svm_locker: self.surfpool_middleware.surfnet_svm.clone(),
183            simnet_commands_tx: self.surfpool_middleware.simnet_commands_tx.clone(),
184            plugin_manager_commands_tx: self.surfpool_middleware.plugin_manager_commands_tx.clone(),
185            remote_rpc_client: self.surfpool_middleware.remote_rpc_client.clone(),
186        };
187        let session = meta
188            .as_ref()
189            .and_then(|m| m.session.clone())
190            .or(self.session.clone());
191        let meta = Some(SurfpoolWebsocketMeta::new(runloop_context, session));
192        Either::Left(Box::pin(next(request, meta).map(move |res| res)))
193    }
194}
195
196#[derive(Clone)]
197pub struct SurfpoolWebsocketMeta {
198    pub runloop_context: RunloopContext,
199    pub session: Option<Arc<Session>>,
200}
201
202impl SurfpoolWebsocketMeta {
203    pub fn new(runloop_context: RunloopContext, session: Option<Arc<Session>>) -> Self {
204        Self {
205            runloop_context,
206            session,
207        }
208    }
209}
210
211impl State for Option<SurfpoolWebsocketMeta> {
212    fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker> {
213        let Some(ctx) = self else {
214            return Err(SurfpoolError::no_locker());
215        };
216        Ok(ctx.runloop_context.svm_locker.clone())
217    }
218
219    fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
220    where
221        F: Fn(&SurfnetSvm) -> T + Send + Sync,
222        T: Send + 'static,
223    {
224        let Some(ctx) = self else {
225            return Err(SurfpoolError::no_locker());
226        };
227        Ok(ctx.runloop_context.svm_locker.with_svm_reader(reader))
228    }
229
230    fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>> {
231        let Some(ctx) = self else {
232            return Err(SurfpoolError::no_locker());
233        };
234
235        Ok(SurfnetRpcContext {
236            svm_locker: ctx.runloop_context.svm_locker.clone(),
237            remote_ctx: ctx.runloop_context.remote_rpc_client.get_remote_ctx(input),
238        })
239    }
240}
241
242impl Metadata for SurfpoolWebsocketMeta {}
243impl PubSubMetadata for SurfpoolWebsocketMeta {
244    fn session(&self) -> Option<Arc<jsonrpc_pubsub::Session>> {
245        self.session.clone()
246    }
247}
248
249pub const NOT_IMPLEMENTED_CODE: i64 = -32051; // -32000 to -32099 are reserved by the json-rpc spec for custom errors
250pub const NOT_IMPLEMENTED_MSG: &str = "Method not yet implemented. If this endpoint is a priority for you, please open an issue here so we can prioritize: https://github.com/txtx/surfpool/issues";
251fn not_implemented_msg(method: &str) -> String {
252    format!("Method `{}` is not yet implemented. If this endpoint is a priority for you, please open an issue here so we can prioritize: https://github.com/txtx/surfpool/issues", method)
253}
254/// Helper function to return a `NotImplemented` JSON RPC error
255pub fn not_implemented_err<T>(method: &str) -> Result<T, Error> {
256    Err(Error {
257        code: jsonrpc_core::types::ErrorCode::ServerError(NOT_IMPLEMENTED_CODE),
258        message: not_implemented_msg(method),
259        data: None,
260    })
261}
262
263pub fn not_implemented_err_async<T>(method: &str) -> BoxFuture<Result<T, Error>> {
264    let method = method.to_string();
265    Box::pin(async move {
266        Err(Error {
267            code: jsonrpc_core::types::ErrorCode::ServerError(NOT_IMPLEMENTED_CODE),
268            message: not_implemented_msg(&method),
269            data: None,
270        })
271    })
272}