1use std::{future::Future, sync::Arc};
2
3use blake3::Hash;
4use crossbeam_channel::Sender;
5use jsonrpc_core::{
6 futures::{future::Either, FutureExt},
7 middleware, BoxFuture, Error, FutureResponse, Metadata, Middleware, Request, Response,
8};
9use jsonrpc_pubsub::{PubSubMetadata, Session};
10use solana_clock::Slot;
11use surfpool_types::{types::RpcConfig, SimnetCommand};
12
13use crate::{
14 error::{SurfpoolError, SurfpoolResult},
15 surfnet::{
16 locker::SurfnetSvmLocker,
17 remote::{SomeRemoteCtx, SurfnetRemoteClient},
18 svm::SurfnetSvm,
19 },
20 PluginManagerCommand,
21};
22
23pub mod accounts_data;
24pub mod accounts_scan;
25pub mod admin;
26pub mod bank_data;
27pub mod full;
28pub mod minimal;
29pub mod surfnet_cheatcodes;
30pub mod utils;
31pub mod ws;
32
33#[derive(PartialEq, Eq, Clone, Copy, Debug)]
34pub enum RpcHealthStatus {
35 Ok,
36 Behind { num_slots: Slot },
37 Unknown,
38}
39
40pub struct SurfpoolRpc;
41
42#[derive(Clone)]
43pub struct RunloopContext {
44 pub id: Option<Hash>,
45 pub svm_locker: SurfnetSvmLocker,
46 pub simnet_commands_tx: Sender<SimnetCommand>,
47 pub plugin_manager_commands_tx: Sender<PluginManagerCommand>,
48 pub remote_rpc_client: Option<SurfnetRemoteClient>,
49}
50
51pub struct SurfnetRpcContext<T> {
52 pub svm_locker: SurfnetSvmLocker,
53 pub remote_ctx: Option<(SurfnetRemoteClient, T)>,
54}
55
56trait State {
57 fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker>;
58 fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
59 where
60 F: Fn(&SurfnetSvm) -> T + Send + Sync,
61 T: Send + 'static;
62 fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>>;
63}
64
65impl State for Option<RunloopContext> {
66 fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker> {
67 let Some(ctx) = self else {
69 return Err(SurfpoolError::no_locker());
70 };
71 Ok(ctx.svm_locker.clone())
72 }
73
74 fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
75 where
76 F: Fn(&SurfnetSvm) -> T + Send + Sync,
77 T: Send + 'static,
78 {
79 let Some(ctx) = self else {
80 return Err(SurfpoolError::no_locker());
81 };
82 Ok(ctx.svm_locker.with_svm_reader(reader))
83 }
84
85 fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>> {
86 let Some(ctx) = self else {
87 return Err(SurfpoolError::no_locker());
88 };
89
90 Ok(SurfnetRpcContext {
91 svm_locker: ctx.svm_locker.clone(),
92 remote_ctx: ctx.remote_rpc_client.get_remote_ctx(input),
93 })
94 }
95}
96
97impl Metadata for RunloopContext {}
98
99#[derive(Clone)]
100pub struct SurfpoolMiddleware {
101 pub surfnet_svm: SurfnetSvmLocker,
102 pub simnet_commands_tx: Sender<SimnetCommand>,
103 pub plugin_manager_commands_tx: Sender<PluginManagerCommand>,
104 pub config: RpcConfig,
105 pub remote_rpc_client: Option<SurfnetRemoteClient>,
106}
107
108impl SurfpoolMiddleware {
109 pub fn new(
110 surfnet_svm: SurfnetSvmLocker,
111 simnet_commands_tx: &Sender<SimnetCommand>,
112 plugin_manager_commands_tx: &Sender<PluginManagerCommand>,
113 config: &RpcConfig,
114 remote_rpc_client: &Option<SurfnetRemoteClient>,
115 ) -> Self {
116 Self {
117 surfnet_svm,
118 simnet_commands_tx: simnet_commands_tx.clone(),
119 plugin_manager_commands_tx: plugin_manager_commands_tx.clone(),
120 config: config.clone(),
121 remote_rpc_client: remote_rpc_client.clone(),
122 }
123 }
124}
125
126impl Middleware<Option<RunloopContext>> for SurfpoolMiddleware {
127 type Future = FutureResponse;
128 type CallFuture = middleware::NoopCallFuture;
129
130 fn on_request<F, X>(
131 &self,
132 request: Request,
133 _meta: Option<RunloopContext>,
134 next: F,
135 ) -> Either<Self::Future, X>
136 where
137 F: FnOnce(Request, Option<RunloopContext>) -> X + Send,
138 X: Future<Output = Option<Response>> + Send + 'static,
139 {
140 let meta = Some(RunloopContext {
141 id: None,
142 svm_locker: self.surfnet_svm.clone(),
143 simnet_commands_tx: self.simnet_commands_tx.clone(),
144 plugin_manager_commands_tx: self.plugin_manager_commands_tx.clone(),
145 remote_rpc_client: self.remote_rpc_client.clone(),
146 });
147 Either::Left(Box::pin(next(request, meta).map(move |res| res)))
148 }
149}
150
151#[derive(Clone)]
152pub struct SurfpoolWebsocketMiddleware {
153 pub surfpool_middleware: SurfpoolMiddleware,
154 pub session: Option<Arc<Session>>,
155}
156
157impl SurfpoolWebsocketMiddleware {
158 pub fn new(surfpool_middleware: SurfpoolMiddleware, session: Option<Arc<Session>>) -> Self {
159 Self {
160 surfpool_middleware,
161 session,
162 }
163 }
164}
165
166impl Middleware<Option<SurfpoolWebsocketMeta>> for SurfpoolWebsocketMiddleware {
167 type Future = FutureResponse;
168 type CallFuture = middleware::NoopCallFuture;
169
170 fn on_request<F, X>(
171 &self,
172 request: Request,
173 meta: Option<SurfpoolWebsocketMeta>,
174 next: F,
175 ) -> Either<Self::Future, X>
176 where
177 F: FnOnce(Request, Option<SurfpoolWebsocketMeta>) -> X + Send,
178 X: Future<Output = Option<Response>> + Send + 'static,
179 {
180 let runloop_context = RunloopContext {
181 id: None,
182 svm_locker: self.surfpool_middleware.surfnet_svm.clone(),
183 simnet_commands_tx: self.surfpool_middleware.simnet_commands_tx.clone(),
184 plugin_manager_commands_tx: self.surfpool_middleware.plugin_manager_commands_tx.clone(),
185 remote_rpc_client: self.surfpool_middleware.remote_rpc_client.clone(),
186 };
187 let session = meta
188 .as_ref()
189 .and_then(|m| m.session.clone())
190 .or(self.session.clone());
191 let meta = Some(SurfpoolWebsocketMeta::new(runloop_context, session));
192 Either::Left(Box::pin(next(request, meta).map(move |res| res)))
193 }
194}
195
196#[derive(Clone)]
197pub struct SurfpoolWebsocketMeta {
198 pub runloop_context: RunloopContext,
199 pub session: Option<Arc<Session>>,
200}
201
202impl SurfpoolWebsocketMeta {
203 pub fn new(runloop_context: RunloopContext, session: Option<Arc<Session>>) -> Self {
204 Self {
205 runloop_context,
206 session,
207 }
208 }
209}
210
211impl State for Option<SurfpoolWebsocketMeta> {
212 fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker> {
213 let Some(ctx) = self else {
214 return Err(SurfpoolError::no_locker());
215 };
216 Ok(ctx.runloop_context.svm_locker.clone())
217 }
218
219 fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
220 where
221 F: Fn(&SurfnetSvm) -> T + Send + Sync,
222 T: Send + 'static,
223 {
224 let Some(ctx) = self else {
225 return Err(SurfpoolError::no_locker());
226 };
227 Ok(ctx.runloop_context.svm_locker.with_svm_reader(reader))
228 }
229
230 fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>> {
231 let Some(ctx) = self else {
232 return Err(SurfpoolError::no_locker());
233 };
234
235 Ok(SurfnetRpcContext {
236 svm_locker: ctx.runloop_context.svm_locker.clone(),
237 remote_ctx: ctx.runloop_context.remote_rpc_client.get_remote_ctx(input),
238 })
239 }
240}
241
242impl Metadata for SurfpoolWebsocketMeta {}
243impl PubSubMetadata for SurfpoolWebsocketMeta {
244 fn session(&self) -> Option<Arc<jsonrpc_pubsub::Session>> {
245 self.session.clone()
246 }
247}
248
249pub const NOT_IMPLEMENTED_CODE: i64 = -32051; pub const NOT_IMPLEMENTED_MSG: &str = "Method not yet implemented. If this endpoint is a priority for you, please open an issue here so we can prioritize: https://github.com/txtx/surfpool/issues";
251fn not_implemented_msg(method: &str) -> String {
252 format!("Method `{}` is not yet implemented. If this endpoint is a priority for you, please open an issue here so we can prioritize: https://github.com/txtx/surfpool/issues", method)
253}
254pub fn not_implemented_err<T>(method: &str) -> Result<T, Error> {
256 Err(Error {
257 code: jsonrpc_core::types::ErrorCode::ServerError(NOT_IMPLEMENTED_CODE),
258 message: not_implemented_msg(method),
259 data: None,
260 })
261}
262
263pub fn not_implemented_err_async<T>(method: &str) -> BoxFuture<Result<T, Error>> {
264 let method = method.to_string();
265 Box::pin(async move {
266 Err(Error {
267 code: jsonrpc_core::types::ErrorCode::ServerError(NOT_IMPLEMENTED_CODE),
268 message: not_implemented_msg(&method),
269 data: None,
270 })
271 })
272}