kora_lib/
cache.rs

1use deadpool_redis::{Pool, Runtime};
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_sdk::{account::Account, pubkey::Pubkey};
6use tokio::sync::OnceCell;
7
8use crate::{config::Config, error::KoraError, sanitize_error};
9
10#[cfg(not(test))]
11use crate::state::get_config;
12
13#[cfg(test)]
14use crate::tests::config_mock::mock_state::get_config;
15
16const ACCOUNT_CACHE_KEY: &str = "account";
17
18/// Global cache pool instance
19static CACHE_POOL: OnceCell<Option<Pool>> = OnceCell::const_new();
20
21/// Cached account data with metadata
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CachedAccount {
24    pub account: Account,
25    pub cached_at: i64, // Unix timestamp
26}
27
28/// Cache utility for Solana RPC calls
29pub struct CacheUtil;
30
31impl CacheUtil {
32    /// Initialize the cache pool based on configuration
33    pub async fn init() -> Result<(), KoraError> {
34        let config = get_config()?;
35
36        #[allow(clippy::needless_borrow)]
37        let pool = if CacheUtil::is_cache_enabled(&config) {
38            let redis_url = config.kora.cache.url.as_ref().ok_or(KoraError::ConfigError)?;
39
40            let cfg = deadpool_redis::Config::from_url(redis_url);
41            let pool = cfg.create_pool(Some(Runtime::Tokio1)).map_err(|e| {
42                KoraError::InternalServerError(format!(
43                    "Failed to create cache pool: {}",
44                    sanitize_error!(e)
45                ))
46            })?;
47
48            // Test connection
49            let mut conn = pool.get().await.map_err(|e| {
50                KoraError::InternalServerError(format!(
51                    "Failed to connect to cache: {}",
52                    sanitize_error!(e)
53                ))
54            })?;
55
56            // Simple connection test - try to get a non-existent key
57            let _: Option<String> = conn.get("__connection_test__").await.map_err(|e| {
58                KoraError::InternalServerError(format!(
59                    "Cache connection test failed: {}",
60                    sanitize_error!(e)
61                ))
62            })?;
63
64            log::info!("Cache initialized successfully");
65
66            Some(pool)
67        } else {
68            log::info!("Cache disabled or no URL configured");
69            None
70        };
71
72        CACHE_POOL.set(pool).map_err(|_| {
73            KoraError::InternalServerError("Cache pool already initialized".to_string())
74        })?;
75
76        Ok(())
77    }
78
79    async fn get_connection(pool: &Pool) -> Result<deadpool_redis::Connection, KoraError> {
80        pool.get().await.map_err(|e| {
81            KoraError::InternalServerError(format!(
82                "Failed to get cache connection: {}",
83                sanitize_error!(e)
84            ))
85        })
86    }
87
88    fn get_account_key(pubkey: &Pubkey) -> String {
89        format!("{ACCOUNT_CACHE_KEY}:{pubkey}")
90    }
91
92    /// Get account directly from RPC (bypassing cache)
93    async fn get_account_from_rpc(
94        rpc_client: &RpcClient,
95        pubkey: &Pubkey,
96    ) -> Result<Account, KoraError> {
97        match rpc_client.get_account(pubkey).await {
98            Ok(account) => Ok(account),
99            Err(e) => {
100                let kora_error = e.into();
101                match kora_error {
102                    KoraError::AccountNotFound(_) => {
103                        Err(KoraError::AccountNotFound(pubkey.to_string()))
104                    }
105                    other_error => Err(other_error),
106                }
107            }
108        }
109    }
110
111    /// Get data from cache
112    async fn get_from_cache(pool: &Pool, key: &str) -> Result<Option<CachedAccount>, KoraError> {
113        let mut conn = Self::get_connection(pool).await?;
114
115        let cached_data: Option<String> = conn.get(key).await.map_err(|e| {
116            KoraError::InternalServerError(format!(
117                "Failed to get from cache: {}",
118                sanitize_error!(e)
119            ))
120        })?;
121
122        match cached_data {
123            Some(data) => {
124                let cached_account: CachedAccount = serde_json::from_str(&data).map_err(|e| {
125                    KoraError::InternalServerError(format!(
126                        "Failed to deserialize cached data: {e}"
127                    ))
128                })?;
129                Ok(Some(cached_account))
130            }
131            None => Ok(None),
132        }
133    }
134
135    /// Get account from RPC and cache it
136    async fn get_account_from_rpc_and_cache(
137        rpc_client: &RpcClient,
138        pubkey: &Pubkey,
139        pool: &Pool,
140        ttl: u64,
141    ) -> Result<Account, KoraError> {
142        let account = Self::get_account_from_rpc(rpc_client, pubkey).await?;
143
144        let cache_key = Self::get_account_key(pubkey);
145        let cached_account =
146            CachedAccount { account: account.clone(), cached_at: chrono::Utc::now().timestamp() };
147
148        if let Err(e) = Self::set_in_cache(pool, &cache_key, &cached_account, ttl).await {
149            log::warn!("Failed to cache account {pubkey}: {e}");
150            // Don't fail the request if caching fails
151        }
152
153        Ok(account)
154    }
155
156    /// Set data in cache with TTL
157    async fn set_in_cache(
158        pool: &Pool,
159        key: &str,
160        data: &CachedAccount,
161        ttl_seconds: u64,
162    ) -> Result<(), KoraError> {
163        let mut conn = Self::get_connection(pool).await?;
164
165        let serialized = serde_json::to_string(data).map_err(|e| {
166            KoraError::InternalServerError(format!(
167                "Failed to serialize cache data: {}",
168                sanitize_error!(e)
169            ))
170        })?;
171
172        conn.set_ex::<_, _, ()>(key, serialized, ttl_seconds).await.map_err(|e| {
173            KoraError::InternalServerError(format!(
174                "Failed to set cache data: {}",
175                sanitize_error!(e)
176            ))
177        })?;
178
179        Ok(())
180    }
181
182    /// Check if cache is enabled and available
183    fn is_cache_enabled(config: &Config) -> bool {
184        config.kora.cache.enabled && config.kora.cache.url.is_some()
185    }
186
187    /// Get account from cache with optional force refresh
188    pub async fn get_account(
189        config: &Config,
190        rpc_client: &RpcClient,
191        pubkey: &Pubkey,
192        force_refresh: bool,
193    ) -> Result<Account, KoraError> {
194        // If cache is disabled or force refresh is requested, go directly to RPC
195        if !CacheUtil::is_cache_enabled(config) {
196            return Self::get_account_from_rpc(rpc_client, pubkey).await;
197        }
198
199        // Get cache pool - if not initialized, fallback to RPC
200        let pool = match CACHE_POOL.get() {
201            Some(pool) => pool,
202            None => {
203                // Cache not initialized, fallback to RPC
204                return Self::get_account_from_rpc(rpc_client, pubkey).await;
205            }
206        };
207
208        let pool = match pool {
209            Some(pool) => pool,
210            None => {
211                // Cache disabled, fallback to RPC
212                return Self::get_account_from_rpc(rpc_client, pubkey).await;
213            }
214        };
215
216        if force_refresh {
217            return Self::get_account_from_rpc_and_cache(
218                rpc_client,
219                pubkey,
220                pool,
221                config.kora.cache.account_ttl,
222            )
223            .await;
224        }
225
226        let cache_key = Self::get_account_key(pubkey);
227
228        // Try to get from cache first
229        if let Ok(Some(cached_account)) = Self::get_from_cache(pool, &cache_key).await {
230            let current_time = chrono::Utc::now().timestamp();
231            let cache_age = current_time - cached_account.cached_at;
232
233            // Check if cache is still valid
234            if cache_age < config.kora.cache.account_ttl as i64 {
235                return Ok(cached_account.account);
236            }
237        }
238
239        // Cache miss or expired, fetch from RPC
240        let account = Self::get_account_from_rpc_and_cache(
241            rpc_client,
242            pubkey,
243            pool,
244            config.kora.cache.account_ttl,
245        )
246        .await?;
247
248        Ok(account)
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use crate::tests::{
256        common::{create_mock_token_account, RpcMockBuilder},
257        config_mock::ConfigMockBuilder,
258    };
259
260    #[tokio::test]
261    async fn test_is_cache_enabled_disabled() {
262        let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
263
264        let config = get_config().unwrap();
265        assert!(!CacheUtil::is_cache_enabled(&config));
266    }
267
268    #[tokio::test]
269    async fn test_is_cache_enabled_no_url() {
270        let _m = ConfigMockBuilder::new()
271            .with_cache_enabled(true)
272            .with_cache_url(None) // Explicitly set no URL
273            .build_and_setup();
274
275        // Without URL, cache should be disabled
276        let config = get_config().unwrap();
277        assert!(!CacheUtil::is_cache_enabled(&config));
278    }
279
280    #[tokio::test]
281    async fn test_is_cache_enabled_with_url() {
282        let _m = ConfigMockBuilder::new()
283            .with_cache_enabled(true)
284            .with_cache_url(Some("redis://localhost:6379".to_string()))
285            .build_and_setup();
286
287        // Give time for config to be set up
288        let config = get_config().unwrap();
289        assert!(CacheUtil::is_cache_enabled(&config));
290    }
291
292    #[tokio::test]
293    async fn test_get_account_key_format() {
294        let pubkey = Pubkey::new_unique();
295        let key = CacheUtil::get_account_key(&pubkey);
296        assert_eq!(key, format!("account:{pubkey}"));
297    }
298
299    #[tokio::test]
300    async fn test_get_account_from_rpc_success() {
301        let pubkey = Pubkey::new_unique();
302        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
303
304        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
305
306        let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
307
308        assert!(result.is_ok());
309        let account = result.unwrap();
310        assert_eq!(account.lamports, expected_account.lamports);
311        assert_eq!(account.owner, expected_account.owner);
312    }
313
314    #[tokio::test]
315    async fn test_get_account_from_rpc_error() {
316        let pubkey = Pubkey::new_unique();
317        let rpc_client = RpcMockBuilder::new().with_account_not_found().build();
318
319        let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
320
321        assert!(result.is_err());
322        match result.unwrap_err() {
323            KoraError::AccountNotFound(account_key) => {
324                assert_eq!(account_key, pubkey.to_string());
325            }
326            _ => panic!("Expected AccountNotFound for account not found error"),
327        }
328    }
329
330    #[tokio::test]
331    async fn test_get_account_cache_disabled_fallback_to_rpc() {
332        let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
333
334        let pubkey = Pubkey::new_unique();
335        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
336
337        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
338
339        let config = get_config().unwrap();
340        let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, false).await;
341
342        assert!(result.is_ok());
343        let account = result.unwrap();
344        assert_eq!(account.lamports, expected_account.lamports);
345    }
346
347    #[tokio::test]
348    async fn test_get_account_force_refresh_bypasses_cache() {
349        let _m = ConfigMockBuilder::new()
350            .with_cache_enabled(false) // Force RPC fallback for simplicity
351            .build_and_setup();
352
353        let pubkey = Pubkey::new_unique();
354        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
355
356        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
357
358        // force_refresh = true should always go to RPC
359        let config = get_config().unwrap();
360        let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, true).await;
361
362        assert!(result.is_ok());
363        let account = result.unwrap();
364        assert_eq!(account.lamports, expected_account.lamports);
365    }
366}