use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use serde_json::Value;
use solana_rpc_client::api::{
client_error::{Error as ClientError, ErrorKind, Result as RpcResult},
request::RpcRequest,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time;
use crate::config::ProviderConfig;
pub struct RpcRequestWrapper {
pub request: RpcRequest,
pub params: Value,
pub sender: oneshot::Sender<RpcResult<serde_json::Value>>,
}
pub struct RpcProvider {
config: ProviderConfig,
client: RpcClient,
}
impl fmt::Debug for RpcProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProviderRpcClient")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
type SharedReceiver = Arc<Mutex<mpsc::Receiver<RpcRequestWrapper>>>;
impl RpcProvider {
pub fn new(config: ProviderConfig) -> Self {
let client = RpcClient::new(config.rpc_url.clone());
Self { config, client }
}
pub async fn run(self, shared_receiver: SharedReceiver) {
let client = self.client;
while let Some(request_wrapper) = Self::next_request(&shared_receiver).await {
let request = request_wrapper.request;
let params = request_wrapper.params;
let sender = request_wrapper.sender;
let request_start_time = Instant::now();
let time_per_request = Duration::from_secs_f64(1.0 / self.config.rate_limit as f64);
match client.send(request, params).await {
Ok(response) => {
let response_value: serde_json::Value = response;
let _ = sender.send(Ok(response_value));
}
Err(e) => {
log::warn!("Error processing request {}: {:?}", request, e);
let _ = sender.send(Err(ClientError {
request: None,
kind: Box::new(ErrorKind::Custom(e.to_string())),
}));
}
}
let request_duration = request_start_time.elapsed();
if request_duration < time_per_request {
let remaining_delay = time_per_request - request_duration;
time::sleep(remaining_delay).await;
}
}
log::debug!("RpcProvider {:?} finished.", self.config.name);
}
async fn next_request(shared_receiver: &SharedReceiver) -> Option<RpcRequestWrapper> {
let mut receiver = shared_receiver.lock().await;
receiver.recv().await
}
}