light_client/
rpc_pool.rs

1use async_trait::async_trait;
2use bb8::{Pool, PooledConnection};
3use solana_sdk::commitment_config::CommitmentConfig;
4use std::time::Duration;
5use thiserror::Error;
6use tokio::time::sleep;
7
8use crate::rpc::{RpcConnection, RpcError};
9
10#[derive(Error, Debug)]
11pub enum PoolError {
12    #[error("Failed to create RPC client: {0}")]
13    ClientCreation(String),
14    #[error("RPC request failed: {0}")]
15    RpcRequest(#[from] RpcError),
16    #[error("Pool error: {0}")]
17    Pool(String),
18}
19
20pub struct SolanaConnectionManager<R: RpcConnection> {
21    url: String,
22    commitment: CommitmentConfig,
23    _phantom: std::marker::PhantomData<R>,
24}
25
26impl<R: RpcConnection> SolanaConnectionManager<R> {
27    pub fn new(url: String, commitment: CommitmentConfig) -> Self {
28        Self {
29            url,
30            commitment,
31            _phantom: std::marker::PhantomData,
32        }
33    }
34}
35
36#[async_trait]
37impl<R: RpcConnection> bb8::ManageConnection for SolanaConnectionManager<R> {
38    type Connection = R;
39    type Error = PoolError;
40
41    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
42        Ok(R::new(&self.url, Some(self.commitment)))
43    }
44
45    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
46        conn.health().await.map_err(PoolError::RpcRequest)
47    }
48
49    fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
50        false
51    }
52}
53
54#[derive(Debug)]
55pub struct SolanaRpcPool<R: RpcConnection> {
56    pool: Pool<SolanaConnectionManager<R>>,
57}
58
59impl<R: RpcConnection> SolanaRpcPool<R> {
60    pub async fn new(
61        url: String,
62        commitment: CommitmentConfig,
63        max_size: u32,
64    ) -> Result<Self, PoolError> {
65        let manager = SolanaConnectionManager::new(url, commitment);
66        let pool = Pool::builder()
67            .max_size(max_size)
68            .connection_timeout(Duration::from_secs(15))
69            .idle_timeout(Some(Duration::from_secs(60 * 5)))
70            .build(manager)
71            .await
72            .map_err(|e| PoolError::Pool(e.to_string()))?;
73
74        Ok(Self { pool })
75    }
76
77    pub async fn get_connection(
78        &self,
79    ) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
80        self.pool
81            .get()
82            .await
83            .map_err(|e| PoolError::Pool(e.to_string()))
84    }
85
86    pub async fn get_connection_with_retry(
87        &self,
88        max_retries: u32,
89        delay: Duration,
90    ) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
91        let mut retries = 0;
92        loop {
93            match self.pool.get().await {
94                Ok(conn) => return Ok(conn),
95                Err(e) if retries < max_retries => {
96                    retries += 1;
97                    eprintln!("Failed to get connection (attempt {}): {:?}", retries, e);
98                    sleep(delay).await;
99                }
100                Err(e) => return Err(PoolError::Pool(e.to_string())),
101            }
102        }
103    }
104}