Skip to main content

kora_lib/
cache.rs

1use deadpool_redis::{Pool, Runtime};
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_commitment_config::CommitmentConfig;
6use solana_sdk::{account::Account, hash::Hash, pubkey::Pubkey};
7use std::str::FromStr;
8use tokio::sync::OnceCell;
9
10use crate::{config::Config, error::KoraError, sanitize_error};
11
12#[cfg(not(test))]
13use crate::state::get_config;
14
15#[cfg(test)]
16use crate::tests::config_mock::mock_state::get_config;
17
18const ACCOUNT_CACHE_KEY: &str = "account";
19const BLOCKHASH_CACHE_KEY: &str = "kora:blockhash";
20/// TTL for cached blockhash in seconds. Blockhashes are valid for ~60s,
21/// but we use a short TTL to keep the hash fresh.
22const BLOCKHASH_TTL: u64 = 5;
23
24/// Global cache pool instance
25static CACHE_POOL: OnceCell<Option<Pool>> = OnceCell::const_new();
26
27/// Cached account data with metadata
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CachedAccount {
30    pub account: Account,
31    pub cached_at: i64, // Unix timestamp
32}
33
34/// Cache utility for Solana RPC calls
35pub struct CacheUtil;
36
37impl CacheUtil {
38    /// Initialize the cache pool based on configuration
39    pub async fn init() -> Result<(), KoraError> {
40        let config = get_config()?;
41
42        #[allow(clippy::needless_borrow)]
43        let pool = if CacheUtil::is_cache_enabled(&config) {
44            let redis_url = config.kora.cache.url.as_ref().ok_or(KoraError::ConfigError(
45                "Redis URL is required when cache is enabled. Set redis_url in config".to_string(),
46            ))?;
47
48            let cfg = deadpool_redis::Config::from_url(redis_url);
49            let pool = cfg.create_pool(Some(Runtime::Tokio1)).map_err(|e| {
50                KoraError::InternalServerError(format!(
51                    "Failed to create cache pool: {}",
52                    sanitize_error!(e)
53                ))
54            })?;
55
56            // Test connection
57            let mut conn = pool.get().await.map_err(|e| {
58                KoraError::InternalServerError(format!(
59                    "Failed to connect to cache: {}",
60                    sanitize_error!(e)
61                ))
62            })?;
63
64            // Simple connection test - try to get a non-existent key
65            let _: Option<String> = conn.get("__connection_test__").await.map_err(|e| {
66                KoraError::InternalServerError(format!(
67                    "Cache connection test failed: {}",
68                    sanitize_error!(e)
69                ))
70            })?;
71
72            log::info!("Cache initialized successfully");
73
74            Some(pool)
75        } else {
76            log::info!("Cache disabled or no URL configured");
77            None
78        };
79
80        CACHE_POOL.set(pool).map_err(|_| {
81            KoraError::InternalServerError("Cache pool already initialized".to_string())
82        })?;
83
84        Ok(())
85    }
86
87    async fn get_connection(pool: &Pool) -> Result<deadpool_redis::Connection, KoraError> {
88        pool.get().await.map_err(|e| {
89            KoraError::InternalServerError(format!(
90                "Failed to get cache connection: {}",
91                sanitize_error!(e)
92            ))
93        })
94    }
95
96    fn get_account_key(pubkey: &Pubkey) -> String {
97        format!("{ACCOUNT_CACHE_KEY}:{pubkey}")
98    }
99
100    /// Get account directly from RPC (bypassing cache)
101    async fn get_account_from_rpc(
102        rpc_client: &RpcClient,
103        pubkey: &Pubkey,
104    ) -> Result<Account, KoraError> {
105        match rpc_client.get_account(pubkey).await {
106            Ok(account) => Ok(account),
107            Err(e) => {
108                let kora_error = e.into();
109                match kora_error {
110                    KoraError::AccountNotFound(_) => {
111                        Err(KoraError::AccountNotFound(pubkey.to_string()))
112                    }
113                    other_error => Err(other_error),
114                }
115            }
116        }
117    }
118
119    /// Get data from cache
120    async fn get_from_cache(pool: &Pool, key: &str) -> Result<Option<CachedAccount>, KoraError> {
121        let mut conn = Self::get_connection(pool).await?;
122
123        let cached_data: Option<String> = conn.get(key).await.map_err(|e| {
124            KoraError::InternalServerError(format!(
125                "Failed to get from cache: {}",
126                sanitize_error!(e)
127            ))
128        })?;
129
130        match cached_data {
131            Some(data) => {
132                let cached_account: CachedAccount = serde_json::from_str(&data).map_err(|e| {
133                    KoraError::InternalServerError(format!(
134                        "Failed to deserialize cached data: {e}"
135                    ))
136                })?;
137                Ok(Some(cached_account))
138            }
139            None => Ok(None),
140        }
141    }
142
143    /// Get account from RPC and cache it
144    async fn get_account_from_rpc_and_cache(
145        rpc_client: &RpcClient,
146        pubkey: &Pubkey,
147        pool: &Pool,
148        ttl: u64,
149    ) -> Result<Account, KoraError> {
150        let account = Self::get_account_from_rpc(rpc_client, pubkey).await?;
151
152        let cache_key = Self::get_account_key(pubkey);
153        let cached_account =
154            CachedAccount { account: account.clone(), cached_at: chrono::Utc::now().timestamp() };
155
156        if let Err(e) = Self::set_in_cache(pool, &cache_key, &cached_account, ttl).await {
157            log::warn!("Failed to cache account {pubkey}: {e}");
158            // Don't fail the request if caching fails
159        }
160
161        Ok(account)
162    }
163
164    /// Set data in cache with TTL
165    async fn set_in_cache(
166        pool: &Pool,
167        key: &str,
168        data: &CachedAccount,
169        ttl_seconds: u64,
170    ) -> Result<(), KoraError> {
171        let mut conn = Self::get_connection(pool).await?;
172
173        let serialized = serde_json::to_string(data).map_err(|e| {
174            KoraError::InternalServerError(format!(
175                "Failed to serialize cache data: {}",
176                sanitize_error!(e)
177            ))
178        })?;
179
180        conn.set_ex::<_, _, ()>(key, serialized, ttl_seconds).await.map_err(|e| {
181            KoraError::InternalServerError(format!(
182                "Failed to set cache data: {}",
183                sanitize_error!(e)
184            ))
185        })?;
186
187        Ok(())
188    }
189
190    /// Check if cache is enabled and available
191    fn is_cache_enabled(config: &Config) -> bool {
192        config.kora.cache.enabled && config.kora.cache.url.is_some()
193    }
194
195    /// Get account from cache with optional force refresh
196    pub async fn get_account(
197        config: &Config,
198        rpc_client: &RpcClient,
199        pubkey: &Pubkey,
200        force_refresh: bool,
201    ) -> Result<Account, KoraError> {
202        // If cache is disabled or force refresh is requested, go directly to RPC
203        if !CacheUtil::is_cache_enabled(config) {
204            return Self::get_account_from_rpc(rpc_client, pubkey).await;
205        }
206
207        // Get cache pool - if not initialized, fallback to RPC
208        let pool = match CACHE_POOL.get() {
209            Some(pool) => pool,
210            None => {
211                // Cache not initialized, fallback to RPC
212                return Self::get_account_from_rpc(rpc_client, pubkey).await;
213            }
214        };
215
216        let pool = match pool {
217            Some(pool) => pool,
218            None => {
219                // Cache disabled, fallback to RPC
220                return Self::get_account_from_rpc(rpc_client, pubkey).await;
221            }
222        };
223
224        if force_refresh {
225            return Self::get_account_from_rpc_and_cache(
226                rpc_client,
227                pubkey,
228                pool,
229                config.kora.cache.account_ttl,
230            )
231            .await;
232        }
233
234        let cache_key = Self::get_account_key(pubkey);
235
236        // Try to get from cache first
237        if let Ok(Some(cached_account)) = Self::get_from_cache(pool, &cache_key).await {
238            let current_time = chrono::Utc::now().timestamp();
239            let cache_age = current_time - cached_account.cached_at;
240
241            // Check if cache is still valid
242            if cache_age < config.kora.cache.account_ttl as i64 {
243                return Ok(cached_account.account);
244            }
245        }
246
247        // Cache miss or expired, fetch from RPC
248        let account = Self::get_account_from_rpc_and_cache(
249            rpc_client,
250            pubkey,
251            pool,
252            config.kora.cache.account_ttl,
253        )
254        .await?;
255
256        Ok(account)
257    }
258
259    /// Get the latest blockhash, using Redis cache when available.
260    ///
261    /// Reduces RPC load by caching the blockhash with a short TTL (5s).
262    /// If the cache is unavailable or errors occur, falls back to a direct RPC call.
263    pub async fn get_or_fetch_latest_blockhash(
264        config: &Config,
265        rpc_client: &RpcClient,
266    ) -> Result<Hash, KoraError> {
267        // If cache is disabled, fetch directly from RPC
268        if !CacheUtil::is_cache_enabled(config) {
269            return Self::fetch_blockhash_from_rpc(rpc_client).await;
270        }
271
272        // Get cache pool - if not initialized, fallback to RPC
273        let pool = match CACHE_POOL.get() {
274            Some(Some(pool)) => pool,
275            _ => return Self::fetch_blockhash_from_rpc(rpc_client).await,
276        };
277
278        // Try to get from cache first
279        match Self::get_blockhash_from_cache(pool).await {
280            Ok(Some(hash)) => return Ok(hash),
281            Ok(None) => { /* cache miss, fetch from RPC */ }
282            Err(e) => {
283                log::warn!("Failed to get blockhash from cache, falling back to RPC: {e}");
284            }
285        }
286
287        // Cache miss or error — fetch from RPC and cache it
288        let hash = Self::fetch_blockhash_from_rpc(rpc_client).await?;
289
290        if let Err(e) = Self::set_blockhash_in_cache(pool, &hash).await {
291            log::warn!("Failed to cache blockhash: {e}");
292            // Don't fail the request if caching fails
293        }
294
295        Ok(hash)
296    }
297
298    /// Fetch the latest blockhash directly from the Solana RPC.
299    async fn fetch_blockhash_from_rpc(rpc_client: &RpcClient) -> Result<Hash, KoraError> {
300        let (blockhash, _) = rpc_client
301            .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
302            .await
303            .map_err(|e| KoraError::RpcError(e.to_string()))?;
304        Ok(blockhash)
305    }
306
307    /// Try to read a cached blockhash from Redis.
308    async fn get_blockhash_from_cache(pool: &Pool) -> Result<Option<Hash>, KoraError> {
309        let mut conn = Self::get_connection(pool).await?;
310
311        let cached: Option<String> = conn.get(BLOCKHASH_CACHE_KEY).await.map_err(|e| {
312            KoraError::InternalServerError(format!(
313                "Failed to get blockhash from cache: {}",
314                sanitize_error!(e)
315            ))
316        })?;
317
318        match cached {
319            Some(s) => {
320                let hash = Hash::from_str(&s).map_err(|e| {
321                    KoraError::InternalServerError(format!("Failed to parse cached blockhash: {e}"))
322                })?;
323                Ok(Some(hash))
324            }
325            None => Ok(None),
326        }
327    }
328
329    /// Store a blockhash in Redis with TTL.
330    async fn set_blockhash_in_cache(pool: &Pool, hash: &Hash) -> Result<(), KoraError> {
331        let mut conn = Self::get_connection(pool).await?;
332
333        conn.set_ex::<_, _, ()>(BLOCKHASH_CACHE_KEY, hash.to_string(), BLOCKHASH_TTL)
334            .await
335            .map_err(|e| {
336                KoraError::InternalServerError(format!(
337                    "Failed to set blockhash in cache: {}",
338                    sanitize_error!(e)
339                ))
340            })?;
341
342        Ok(())
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::tests::{
350        common::{create_mock_token_account, RpcMockBuilder},
351        config_mock::ConfigMockBuilder,
352    };
353
354    #[tokio::test]
355    async fn test_is_cache_enabled_disabled() {
356        let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
357
358        let config = get_config().unwrap();
359        assert!(!CacheUtil::is_cache_enabled(&config));
360    }
361
362    #[tokio::test]
363    async fn test_is_cache_enabled_no_url() {
364        let _m = ConfigMockBuilder::new()
365            .with_cache_enabled(true)
366            .with_cache_url(None) // Explicitly set no URL
367            .build_and_setup();
368
369        // Without URL, cache should be disabled
370        let config = get_config().unwrap();
371        assert!(!CacheUtil::is_cache_enabled(&config));
372    }
373
374    #[tokio::test]
375    async fn test_is_cache_enabled_with_url() {
376        let _m = ConfigMockBuilder::new()
377            .with_cache_enabled(true)
378            .with_cache_url(Some("redis://localhost:6379".to_string()))
379            .build_and_setup();
380
381        // Give time for config to be set up
382        let config = get_config().unwrap();
383        assert!(CacheUtil::is_cache_enabled(&config));
384    }
385
386    #[tokio::test]
387    async fn test_get_account_key_format() {
388        let pubkey = Pubkey::new_unique();
389        let key = CacheUtil::get_account_key(&pubkey);
390        assert_eq!(key, format!("account:{pubkey}"));
391    }
392
393    #[tokio::test]
394    async fn test_get_account_from_rpc_success() {
395        let pubkey = Pubkey::new_unique();
396        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
397
398        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
399
400        let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
401
402        assert!(result.is_ok());
403        let account = result.unwrap();
404        assert_eq!(account.lamports, expected_account.lamports);
405        assert_eq!(account.owner, expected_account.owner);
406    }
407
408    #[tokio::test]
409    async fn test_get_account_from_rpc_error() {
410        let pubkey = Pubkey::new_unique();
411        let rpc_client = RpcMockBuilder::new().with_account_not_found().build();
412
413        let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
414
415        assert!(result.is_err());
416        match result.unwrap_err() {
417            KoraError::AccountNotFound(account_key) => {
418                assert_eq!(account_key, pubkey.to_string());
419            }
420            _ => panic!("Expected AccountNotFound for account not found error"),
421        }
422    }
423
424    #[tokio::test]
425    async fn test_get_account_cache_disabled_fallback_to_rpc() {
426        let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
427
428        let pubkey = Pubkey::new_unique();
429        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
430
431        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
432
433        let config = get_config().unwrap();
434        let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, false).await;
435
436        assert!(result.is_ok());
437        let account = result.unwrap();
438        assert_eq!(account.lamports, expected_account.lamports);
439    }
440
441    #[tokio::test]
442    async fn test_get_account_force_refresh_bypasses_cache() {
443        let _m = ConfigMockBuilder::new()
444            .with_cache_enabled(false) // Force RPC fallback for simplicity
445            .build_and_setup();
446
447        let pubkey = Pubkey::new_unique();
448        let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
449
450        let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
451
452        // force_refresh = true should always go to RPC
453        let config = get_config().unwrap();
454        let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, true).await;
455
456        assert!(result.is_ok());
457        let account = result.unwrap();
458        assert_eq!(account.lamports, expected_account.lamports);
459    }
460
461    #[tokio::test]
462    async fn test_get_or_fetch_blockhash_cache_disabled() {
463        let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
464
465        let rpc_client = RpcMockBuilder::new().with_blockhash().build();
466
467        let config = get_config().unwrap();
468        let result = CacheUtil::get_or_fetch_latest_blockhash(&config, &rpc_client).await;
469
470        assert!(result.is_ok(), "Should successfully get blockhash with cache disabled");
471        let hash = result.unwrap();
472        assert_ne!(hash, Hash::default(), "Blockhash should not be the default hash");
473    }
474
475    #[tokio::test]
476    async fn test_fetch_blockhash_from_rpc_success() {
477        let rpc_client = RpcMockBuilder::new().with_blockhash().build();
478
479        let result = CacheUtil::fetch_blockhash_from_rpc(&rpc_client).await;
480
481        assert!(result.is_ok(), "Should successfully fetch blockhash from RPC");
482        let hash = result.unwrap();
483        assert_ne!(hash, Hash::default(), "Blockhash should not be the default hash");
484    }
485}