surfpool-core 0.9.1

Where you train before surfing Solana
Documentation
use std::{future::Future, sync::Arc};

use blake3::Hash;
use crossbeam_channel::Sender;
use jsonrpc_core::{
    futures::{future::Either, FutureExt},
    middleware, BoxFuture, Error, FutureResponse, Metadata, Middleware, Request, Response,
};
use jsonrpc_pubsub::{PubSubMetadata, Session};
use solana_clock::Slot;
use surfpool_types::{types::RpcConfig, SimnetCommand};

use crate::{
    error::{SurfpoolError, SurfpoolResult},
    surfnet::{
        locker::SurfnetSvmLocker,
        remote::{SomeRemoteCtx, SurfnetRemoteClient},
        svm::SurfnetSvm,
    },
    PluginManagerCommand,
};

pub mod accounts_data;
pub mod accounts_scan;
pub mod admin;
pub mod bank_data;
pub mod full;
pub mod minimal;
pub mod surfnet_cheatcodes;
pub mod utils;
pub mod ws;

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum RpcHealthStatus {
    Ok,
    Behind { num_slots: Slot },
    Unknown,
}

pub struct SurfpoolRpc;

#[derive(Clone)]
pub struct RunloopContext {
    pub id: Option<Hash>,
    pub svm_locker: SurfnetSvmLocker,
    pub simnet_commands_tx: Sender<SimnetCommand>,
    pub plugin_manager_commands_tx: Sender<PluginManagerCommand>,
    pub remote_rpc_client: Option<SurfnetRemoteClient>,
}

pub struct SurfnetRpcContext<T> {
    pub svm_locker: SurfnetSvmLocker,
    pub remote_ctx: Option<(SurfnetRemoteClient, T)>,
}

trait State {
    fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker>;
    fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
    where
        F: Fn(&SurfnetSvm) -> T + Send + Sync,
        T: Send + 'static;
    fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>>;
}

impl State for Option<RunloopContext> {
    fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker> {
        // Retrieve svm state
        let Some(ctx) = self else {
            return Err(SurfpoolError::no_locker());
        };
        Ok(ctx.svm_locker.clone())
    }

    fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
    where
        F: Fn(&SurfnetSvm) -> T + Send + Sync,
        T: Send + 'static,
    {
        let Some(ctx) = self else {
            return Err(SurfpoolError::no_locker());
        };
        Ok(ctx.svm_locker.with_svm_reader(reader))
    }

    fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>> {
        let Some(ctx) = self else {
            return Err(SurfpoolError::no_locker());
        };

        Ok(SurfnetRpcContext {
            svm_locker: ctx.svm_locker.clone(),
            remote_ctx: ctx.remote_rpc_client.get_remote_ctx(input),
        })
    }
}

impl Metadata for RunloopContext {}

#[derive(Clone)]
pub struct SurfpoolMiddleware {
    pub surfnet_svm: SurfnetSvmLocker,
    pub simnet_commands_tx: Sender<SimnetCommand>,
    pub plugin_manager_commands_tx: Sender<PluginManagerCommand>,
    pub config: RpcConfig,
    pub remote_rpc_client: Option<SurfnetRemoteClient>,
}

impl SurfpoolMiddleware {
    pub fn new(
        surfnet_svm: SurfnetSvmLocker,
        simnet_commands_tx: &Sender<SimnetCommand>,
        plugin_manager_commands_tx: &Sender<PluginManagerCommand>,
        config: &RpcConfig,
        remote_rpc_client: &Option<SurfnetRemoteClient>,
    ) -> Self {
        Self {
            surfnet_svm,
            simnet_commands_tx: simnet_commands_tx.clone(),
            plugin_manager_commands_tx: plugin_manager_commands_tx.clone(),
            config: config.clone(),
            remote_rpc_client: remote_rpc_client.clone(),
        }
    }
}

impl Middleware<Option<RunloopContext>> for SurfpoolMiddleware {
    type Future = FutureResponse;
    type CallFuture = middleware::NoopCallFuture;

    fn on_request<F, X>(
        &self,
        request: Request,
        _meta: Option<RunloopContext>,
        next: F,
    ) -> Either<Self::Future, X>
    where
        F: FnOnce(Request, Option<RunloopContext>) -> X + Send,
        X: Future<Output = Option<Response>> + Send + 'static,
    {
        let meta = Some(RunloopContext {
            id: None,
            svm_locker: self.surfnet_svm.clone(),
            simnet_commands_tx: self.simnet_commands_tx.clone(),
            plugin_manager_commands_tx: self.plugin_manager_commands_tx.clone(),
            remote_rpc_client: self.remote_rpc_client.clone(),
        });
        Either::Left(Box::pin(next(request, meta).map(move |res| res)))
    }
}

#[derive(Clone)]
pub struct SurfpoolWebsocketMiddleware {
    pub surfpool_middleware: SurfpoolMiddleware,
    pub session: Option<Arc<Session>>,
}

impl SurfpoolWebsocketMiddleware {
    pub fn new(surfpool_middleware: SurfpoolMiddleware, session: Option<Arc<Session>>) -> Self {
        Self {
            surfpool_middleware,
            session,
        }
    }
}

impl Middleware<Option<SurfpoolWebsocketMeta>> for SurfpoolWebsocketMiddleware {
    type Future = FutureResponse;
    type CallFuture = middleware::NoopCallFuture;

    fn on_request<F, X>(
        &self,
        request: Request,
        meta: Option<SurfpoolWebsocketMeta>,
        next: F,
    ) -> Either<Self::Future, X>
    where
        F: FnOnce(Request, Option<SurfpoolWebsocketMeta>) -> X + Send,
        X: Future<Output = Option<Response>> + Send + 'static,
    {
        let runloop_context = RunloopContext {
            id: None,
            svm_locker: self.surfpool_middleware.surfnet_svm.clone(),
            simnet_commands_tx: self.surfpool_middleware.simnet_commands_tx.clone(),
            plugin_manager_commands_tx: self.surfpool_middleware.plugin_manager_commands_tx.clone(),
            remote_rpc_client: self.surfpool_middleware.remote_rpc_client.clone(),
        };
        let session = meta
            .as_ref()
            .and_then(|m| m.session.clone())
            .or(self.session.clone());
        let meta = Some(SurfpoolWebsocketMeta::new(runloop_context, session));
        Either::Left(Box::pin(next(request, meta).map(move |res| res)))
    }
}

#[derive(Clone)]
pub struct SurfpoolWebsocketMeta {
    pub runloop_context: RunloopContext,
    pub session: Option<Arc<Session>>,
}

impl SurfpoolWebsocketMeta {
    pub fn new(runloop_context: RunloopContext, session: Option<Arc<Session>>) -> Self {
        Self {
            runloop_context,
            session,
        }
    }
}

impl State for Option<SurfpoolWebsocketMeta> {
    fn get_svm_locker(&self) -> SurfpoolResult<SurfnetSvmLocker> {
        let Some(ctx) = self else {
            return Err(SurfpoolError::no_locker());
        };
        Ok(ctx.runloop_context.svm_locker.clone())
    }

    fn with_svm_reader<T, F>(&self, reader: F) -> Result<T, SurfpoolError>
    where
        F: Fn(&SurfnetSvm) -> T + Send + Sync,
        T: Send + 'static,
    {
        let Some(ctx) = self else {
            return Err(SurfpoolError::no_locker());
        };
        Ok(ctx.runloop_context.svm_locker.with_svm_reader(reader))
    }

    fn get_rpc_context<T>(&self, input: T) -> SurfpoolResult<SurfnetRpcContext<T>> {
        let Some(ctx) = self else {
            return Err(SurfpoolError::no_locker());
        };

        Ok(SurfnetRpcContext {
            svm_locker: ctx.runloop_context.svm_locker.clone(),
            remote_ctx: ctx.runloop_context.remote_rpc_client.get_remote_ctx(input),
        })
    }
}

impl Metadata for SurfpoolWebsocketMeta {}
impl PubSubMetadata for SurfpoolWebsocketMeta {
    fn session(&self) -> Option<Arc<jsonrpc_pubsub::Session>> {
        self.session.clone()
    }
}

pub const NOT_IMPLEMENTED_CODE: i64 = -32051; // -32000 to -32099 are reserved by the json-rpc spec for custom errors
pub const NOT_IMPLEMENTED_MSG: &str = "Method not yet implemented. If this endpoint is a priority for you, please open an issue here so we can prioritize: https://github.com/txtx/surfpool/issues";
fn not_implemented_msg(method: &str) -> String {
    format!("Method `{}` is not yet implemented. If this endpoint is a priority for you, please open an issue here so we can prioritize: https://github.com/txtx/surfpool/issues", method)
}
/// Helper function to return a `NotImplemented` JSON RPC error
pub fn not_implemented_err<T>(method: &str) -> Result<T, Error> {
    Err(Error {
        code: jsonrpc_core::types::ErrorCode::ServerError(NOT_IMPLEMENTED_CODE),
        message: not_implemented_msg(method),
        data: None,
    })
}

pub fn not_implemented_err_async<T>(method: &str) -> BoxFuture<Result<T, Error>> {
    let method = method.to_string();
    Box::pin(async move {
        Err(Error {
            code: jsonrpc_core::types::ErrorCode::ServerError(NOT_IMPLEMENTED_CODE),
            message: not_implemented_msg(&method),
            data: None,
        })
    })
}