1use async_trait::async_trait;
2use bb8::{Pool, PooledConnection};
3use solana_sdk::commitment_config::CommitmentConfig;
4use std::time::Duration;
5use thiserror::Error;
6use tokio::time::sleep;
7
8use crate::rpc::{RpcConnection, RpcError};
9
10#[derive(Error, Debug)]
11pub enum PoolError {
12 #[error("Failed to create RPC client: {0}")]
13 ClientCreation(String),
14 #[error("RPC request failed: {0}")]
15 RpcRequest(#[from] RpcError),
16 #[error("Pool error: {0}")]
17 Pool(String),
18}
19
20pub struct SolanaConnectionManager<R: RpcConnection> {
21 url: String,
22 commitment: CommitmentConfig,
23 _phantom: std::marker::PhantomData<R>,
24}
25
26impl<R: RpcConnection> SolanaConnectionManager<R> {
27 pub fn new(url: String, commitment: CommitmentConfig) -> Self {
28 Self {
29 url,
30 commitment,
31 _phantom: std::marker::PhantomData,
32 }
33 }
34}
35
36#[async_trait]
37impl<R: RpcConnection> bb8::ManageConnection for SolanaConnectionManager<R> {
38 type Connection = R;
39 type Error = PoolError;
40
41 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
42 Ok(R::new(&self.url, Some(self.commitment)))
43 }
44
45 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
46 conn.health().await.map_err(PoolError::RpcRequest)
47 }
48
49 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
50 false
51 }
52}
53
54#[derive(Debug)]
55pub struct SolanaRpcPool<R: RpcConnection> {
56 pool: Pool<SolanaConnectionManager<R>>,
57}
58
59impl<R: RpcConnection> SolanaRpcPool<R> {
60 pub async fn new(
61 url: String,
62 commitment: CommitmentConfig,
63 max_size: u32,
64 ) -> Result<Self, PoolError> {
65 let manager = SolanaConnectionManager::new(url, commitment);
66 let pool = Pool::builder()
67 .max_size(max_size)
68 .connection_timeout(Duration::from_secs(15))
69 .idle_timeout(Some(Duration::from_secs(60 * 5)))
70 .build(manager)
71 .await
72 .map_err(|e| PoolError::Pool(e.to_string()))?;
73
74 Ok(Self { pool })
75 }
76
77 pub async fn get_connection(
78 &self,
79 ) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
80 self.pool
81 .get()
82 .await
83 .map_err(|e| PoolError::Pool(e.to_string()))
84 }
85
86 pub async fn get_connection_with_retry(
87 &self,
88 max_retries: u32,
89 delay: Duration,
90 ) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
91 let mut retries = 0;
92 loop {
93 match self.pool.get().await {
94 Ok(conn) => return Ok(conn),
95 Err(e) if retries < max_retries => {
96 retries += 1;
97 eprintln!("Failed to get connection (attempt {}): {:?}", retries, e);
98 sleep(delay).await;
99 }
100 Err(e) => return Err(PoolError::Pool(e.to_string())),
101 }
102 }
103 }
104}