Skip to main content

kora_lib/
cache.rs

1use deadpool_redis::{Pool, Runtime};
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_sdk::{account::Account, pubkey::Pubkey};
6use tokio::sync::OnceCell;
7
8use crate::{error::KoraError, sanitize_error};
9
10#[cfg(not(test))]
11use crate::state::get_config;
12
13#[cfg(test)]
14use crate::tests::config_mock::mock_state::get_config;
15
16const ACCOUNT_CACHE_KEY: &str = "account";
17
18/// Global cache pool instance
19static CACHE_POOL: OnceCell<Option<Pool>> = OnceCell::const_new();
20
21/// Cached account data with metadata
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CachedAccount {
24    pub account: Account,
25    pub cached_at: i64, // Unix timestamp
26}
27
28/// Cache utility for Solana RPC calls
29pub struct CacheUtil;
30
31impl CacheUtil {
32    /// Initialize the cache pool based on configuration
33    pub async fn init() -> Result<(), KoraError> {
34        let config = get_config()?;
35
36        let pool = if CacheUtil::is_cache_enabled() {
37            let redis_url = config.kora.cache.url.as_ref().ok_or(KoraError::ConfigError)?;
38
39            let cfg = deadpool_redis::Config::from_url(redis_url);
40            let pool = cfg.create_pool(Some(Runtime::Tokio1)).map_err(|e| {
41                KoraError::InternalServerError(format!(
42                    "Failed to create cache pool: {}",
43                    sanitize_error!(e)
44                ))
45            })?;
46
47            // Test connection
48            let mut conn = pool.get().await.map_err(|e| {
49                KoraError::InternalServerError(format!(
50                    "Failed to connect to cache: {}",
51                    sanitize_error!(e)
52                ))
53            })?;
54
55            // Simple connection test - try to get a non-existent key
56            let _: Option<String> = conn.get("__connection_test__").await.map_err(|e| {
57                KoraError::InternalServerError(format!(
58                    "Cache connection test failed: {}",
59                    sanitize_error!(e)
60                ))
61            })?;
62
63            log::info!("Cache initialized successfully");
64
65            Some(pool)
66        } else {
67            log::info!("Cache disabled or no URL configured");
68            None
69        };
70
71        CACHE_POOL.set(pool).map_err(|_| {
72            KoraError::InternalServerError("Cache pool already initialized".to_string())
73        })?;
74
75        Ok(())
76    }
77
78    async fn get_connection(pool: &Pool) -> Result<deadpool_redis::Connection, KoraError> {
79        pool.get().await.map_err(|e| {
80            KoraError::InternalServerError(format!(
81                "Failed to get cache connection: {}",
82                sanitize_error!(e)
83            ))
84        })
85    }
86
87    fn get_account_key(pubkey: &Pubkey) -> String {
88        format!("{ACCOUNT_CACHE_KEY}:{pubkey}")
89    }
90
91    /// Get account directly from RPC (bypassing cache)
92    async fn get_account_from_rpc(
93        rpc_client: &RpcClient,
94        pubkey: &Pubkey,
95    ) -> Result<Account, KoraError> {
96        match rpc_client.get_account(pubkey).await {
97            Ok(account) => Ok(account),
98            Err(e) => {
99                let kora_error = e.into();
100                match kora_error {
101                    KoraError::AccountNotFound(_) => {
102                        Err(KoraError::AccountNotFound(pubkey.to_string()))
103                    }
104                    other_error => Err(other_error),
105                }
106            }
107        }
108    }
109
110    /// Get data from cache
111    async fn get_from_cache(pool: &Pool, key: &str) -> Result<Option<CachedAccount>, KoraError> {
112        let mut conn = Self::get_connection(pool).await?;
113
114        let cached_data: Option<String> = conn.get(key).await.map_err(|e| {
115            KoraError::InternalServerError(format!(
116                "Failed to get from cache: {}",
117                sanitize_error!(e)
118            ))
119        })?;
120
121        match cached_data {
122            Some(data) => {
123                let cached_account: CachedAccount = serde_json::from_str(&data).map_err(|e| {
124                    KoraError::InternalServerError(format!(
125                        "Failed to deserialize cached data: {e}"
126                    ))
127                })?;
128                Ok(Some(cached_account))
129            }
130            None => Ok(None),
131        }
132    }
133
134    /// Get account from RPC and cache it
135    async fn get_account_from_rpc_and_cache(
136        rpc_client: &RpcClient,
137        pubkey: &Pubkey,
138        pool: &Pool,
139        ttl: u64,
140    ) -> Result<Account, KoraError> {
141        let account = Self::get_account_from_rpc(rpc_client, pubkey).await?;
142
143        let cache_key = Self::get_account_key(pubkey);
144        let cached_account =
145            CachedAccount { account: account.clone(), cached_at: chrono::Utc::now().timestamp() };
146
147        if let Err(e) = Self::set_in_cache(pool, &cache_key, &cached_account, ttl).await {
148            log::warn!("Failed to cache account {pubkey}: {e}");
149            // Don't fail the request if caching fails
150        }
151
152        Ok(account)
153    }
154
155    /// Set data in cache with TTL
156    async fn set_in_cache(
157        pool: &Pool,
158        key: &str,
159        data: &CachedAccount,
160        ttl_seconds: u64,
161    ) -> Result<(), KoraError> {
162        let mut conn = Self::get_connection(pool).await?;
163
164        let serialized = serde_json::to_string(data).map_err(|e| {
165            KoraError::InternalServerError(format!(
166                "Failed to serialize cache data: {}",
167                sanitize_error!(e)
168            ))
169        })?;
170
171        conn.set_ex::<_, _, ()>(key, serialized, ttl_seconds).await.map_err(|e| {
172            KoraError::InternalServerError(format!(
173                "Failed to set cache data: {}",
174                sanitize_error!(e)
175            ))
176        })?;
177
178        Ok(())
179    }
180
181    /// Check if cache is enabled and available
182    fn is_cache_enabled() -> bool {
183        match get_config() {
184            Ok(config) => config.kora.cache.enabled && config.kora.cache.url.is_some(),
185            Err(_) => false,
186        }
187    }
188
189    /// Get account from cache with optional force refresh
190    pub async fn get_account(
191        rpc_client: &RpcClient,
192        pubkey: &Pubkey,
193        force_refresh: bool,
194    ) -> Result<Account, KoraError> {
195        let config = get_config()?;
196
197        // If cache is disabled or force refresh is requested, go directly to RPC
198        if !CacheUtil::is_cache_enabled() {
199            return Self::get_account_from_rpc(rpc_client, pubkey).await;
200        }
201
202        // Get cache pool - if not initialized, fallback to RPC
203        let pool = match CACHE_POOL.get() {
204            Some(pool) => pool,
205            None => {
206                // Cache not initialized, fallback to RPC
207                return Self::get_account_from_rpc(rpc_client, pubkey).await;
208            }
209        };
210
211        let pool = match pool {
212            Some(pool) => pool,
213            None => {
214                // Cache disabled, fallback to RPC
215                return Self::get_account_from_rpc(rpc_client, pubkey).await;
216            }
217        };
218
219        if force_refresh {
220            return Self::get_account_from_rpc_and_cache(
221                rpc_client,
222                pubkey,
223                pool,
224                config.kora.cache.account_ttl,
225            )
226            .await;
227        }
228
229        let cache_key = Self::get_account_key(pubkey);
230
231        // Try to get from cache first
232        if let Ok(Some(cached_account)) = Self::get_from_cache(pool, &cache_key).await {
233            let current_time = chrono::Utc::now().timestamp();
234            let cache_age = current_time - cached_account.cached_at;
235
236            // Check if cache is still valid
237            if cache_age < config.kora.cache.account_ttl as i64 {
238                return Ok(cached_account.account);
239            }
240        }
241
242        // Cache miss or expired, fetch from RPC
243        let account = Self::get_account_from_rpc_and_cache(
244            rpc_client,
245            pubkey,
246            pool,
247            config.kora.cache.account_ttl,
248        )
249        .await?;
250
251        Ok(account)
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::tests::{
259        common::{create_mock_token_account, RpcMockBuilder},
260        config_mock::ConfigMockBuilder,
261    };
262
263    #[tokio::test]
264    async fn test_is_cache_enabled_disabled() {
265        let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
266
267        assert!(!CacheUtil::is_cache_enabled());
268    }
269
270    #[tokio::test]
271    async fn test_is_cache_enabled_no_url() {
272        let _m = ConfigMockBuilder::new()
273            .with_cache_enabled(true)
274            .with_cache_url(None) // Explicitly set no URL
275            .build_and_setup();
276
277        // Without URL, cache should be disabled
278        assert!(!CacheUtil::is_cache_enabled());
279    }
280
281    #[tokio::test]
282    async fn test_is_cache_enabled_with_url() {
283        let _m = ConfigMockBuilder::new()
284            .with_cache_enabled(true)
285            .with_cache_url(Some("redis://localhost:6379".to_string()))
286            .build_and_setup();
287
288        // Give time for config to be set up
289        assert!(CacheUtil::is_cache_enabled());
290    }
291
292    #[tokio::test]
293    async fn test_get_account_key_format() {
294        let pubkey = Pubkey::new_unique();
295        let key = CacheUtil::get_account_key(&pubkey);
296        assert_eq!(key, format!("account:{pubkey}"));
297    }
298
299    #[tokio::test]
300    async fn test_get_account_from_rpc_success() {
301        let pubkey = Pubkey::new_unique();
302        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
303
304        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
305
306        let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
307
308        assert!(result.is_ok());
309        let account = result.unwrap();
310        assert_eq!(account.lamports, expected_account.lamports);
311        assert_eq!(account.owner, expected_account.owner);
312    }
313
314    #[tokio::test]
315    async fn test_get_account_from_rpc_error() {
316        let pubkey = Pubkey::new_unique();
317        let rpc_client = RpcMockBuilder::new().with_account_not_found().build();
318
319        let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
320
321        assert!(result.is_err());
322        match result.unwrap_err() {
323            KoraError::AccountNotFound(account_key) => {
324                assert_eq!(account_key, pubkey.to_string());
325            }
326            _ => panic!("Expected AccountNotFound for account not found error"),
327        }
328    }
329
330    #[tokio::test]
331    async fn test_get_account_cache_disabled_fallback_to_rpc() {
332        let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
333
334        let pubkey = Pubkey::new_unique();
335        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
336
337        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
338
339        let result = CacheUtil::get_account(&rpc_client, &pubkey, false).await;
340
341        assert!(result.is_ok());
342        let account = result.unwrap();
343        assert_eq!(account.lamports, expected_account.lamports);
344    }
345
346    #[tokio::test]
347    async fn test_get_account_force_refresh_bypasses_cache() {
348        let _m = ConfigMockBuilder::new()
349            .with_cache_enabled(false) // Force RPC fallback for simplicity
350            .build_and_setup();
351
352        let pubkey = Pubkey::new_unique();
353        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
354
355        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
356
357        // force_refresh = true should always go to RPC
358        let result = CacheUtil::get_account(&rpc_client, &pubkey, true).await;
359
360        assert!(result.is_ok());
361        let account = result.unwrap();
362        assert_eq!(account.lamports, expected_account.lamports);
363    }
364}