sol-rpc-router 0.2.0

Solana RPC Router that splits requests across multiple providers
Documentation
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};

use serde_json::Value;
use solana_rpc_client::api::{
    client_error::{Error as ClientError, ErrorKind, Result as RpcResult},
    request::RpcRequest,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time;

use crate::config::ProviderConfig;

/// Custom request type for RPC requests
pub struct RpcRequestWrapper {
    pub request: RpcRequest,
    pub params: Value,
    pub sender: oneshot::Sender<RpcResult<serde_json::Value>>,
}

/// Provider state with rate limiting information
pub struct RpcProvider {
    config: ProviderConfig,
    client: RpcClient,
}

impl fmt::Debug for RpcProvider {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ProviderRpcClient")
            .field("config", &self.config)
            .finish_non_exhaustive()
    }
}

type SharedReceiver = Arc<Mutex<mpsc::Receiver<RpcRequestWrapper>>>;

impl RpcProvider {
    pub fn new(config: ProviderConfig) -> Self {
        let client = RpcClient::new(config.rpc_url.clone());
        Self { config, client }
    }

    /// Runs loop to process incoming requests
    pub async fn run(self, shared_receiver: SharedReceiver) {
        let client = self.client;

        while let Some(request_wrapper) = Self::next_request(&shared_receiver).await {
            let request = request_wrapper.request;
            let params = request_wrapper.params;
            let sender = request_wrapper.sender;

            let request_start_time = Instant::now();
            let time_per_request = Duration::from_secs_f64(1.0 / self.config.rate_limit as f64);

            // Process the request using the RPC client
            match client.send(request, params).await {
                Ok(response) => {
                    let response_value: serde_json::Value = response;
                    let _ = sender.send(Ok(response_value));
                }
                Err(e) => {
                    log::warn!("Error processing request {}: {:?}", request, e);
                    let _ = sender.send(Err(ClientError {
                        request: None,
                        kind: Box::new(ErrorKind::Custom(e.to_string())),
                    }));
                }
            }

            let request_duration = request_start_time.elapsed();
            if request_duration < time_per_request {
                let remaining_delay = time_per_request - request_duration;
                time::sleep(remaining_delay).await;
            }
        }

        log::debug!("RpcProvider {:?} finished.", self.config.name);
    }

    async fn next_request(shared_receiver: &SharedReceiver) -> Option<RpcRequestWrapper> {
        let mut receiver = shared_receiver.lock().await;
        receiver.recv().await
    }
}