1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use async_trait::async_trait;
use bb8::{Pool, PooledConnection};
use solana_sdk::commitment_config::CommitmentConfig;
use std::time::Duration;
use thiserror::Error;
use tokio::time::sleep;

use crate::rpc::{RpcConnection, RpcError};

#[derive(Error, Debug)]
pub enum PoolError {
    #[error("Failed to create RPC client: {0}")]
    ClientCreation(String),
    #[error("RPC request failed: {0}")]
    RpcRequest(#[from] RpcError),
    #[error("Pool error: {0}")]
    Pool(String),
}

pub struct SolanaConnectionManager<R: RpcConnection> {
    url: String,
    commitment: CommitmentConfig,
    _phantom: std::marker::PhantomData<R>,
}

impl<R: RpcConnection> SolanaConnectionManager<R> {
    pub fn new(url: String, commitment: CommitmentConfig) -> Self {
        Self {
            url,
            commitment,
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait]
impl<R: RpcConnection> bb8::ManageConnection for SolanaConnectionManager<R> {
    type Connection = R;
    type Error = PoolError;

    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        Ok(R::new(&self.url, Some(self.commitment)))
    }

    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
        conn.health().await.map_err(PoolError::RpcRequest)
    }

    fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
        false
    }
}

#[derive(Debug)]
pub struct SolanaRpcPool<R: RpcConnection> {
    pool: Pool<SolanaConnectionManager<R>>,
}

impl<R: RpcConnection> SolanaRpcPool<R> {
    pub async fn new(
        url: String,
        commitment: CommitmentConfig,
        max_size: u32,
    ) -> Result<Self, PoolError> {
        let manager = SolanaConnectionManager::new(url, commitment);
        let pool = Pool::builder()
            .max_size(max_size)
            .connection_timeout(Duration::from_secs(15))
            .idle_timeout(Some(Duration::from_secs(60 * 5)))
            .build(manager)
            .await
            .map_err(|e| PoolError::Pool(e.to_string()))?;

        Ok(Self { pool })
    }

    pub async fn get_connection(
        &self,
    ) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
        self.pool
            .get()
            .await
            .map_err(|e| PoolError::Pool(e.to_string()))
    }

    pub async fn get_connection_with_retry(
        &self,
        max_retries: u32,
        delay: Duration,
    ) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
        let mut retries = 0;
        loop {
            match self.pool.get().await {
                Ok(conn) => return Ok(conn),
                Err(e) if retries < max_retries => {
                    retries += 1;
                    eprintln!("Failed to get connection (attempt {}): {:?}", retries, e);
                    sleep(delay).await;
                }
                Err(e) => return Err(PoolError::Pool(e.to_string())),
            }
        }
    }
}