1use deadpool_redis::{Pool, Runtime};
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_commitment_config::CommitmentConfig;
6use solana_sdk::{account::Account, hash::Hash, pubkey::Pubkey};
7use std::str::FromStr;
8use tokio::sync::OnceCell;
9
10use crate::{config::Config, error::KoraError, sanitize_error};
11
12#[cfg(not(test))]
13use crate::state::get_config;
14
15#[cfg(test)]
16use crate::tests::config_mock::mock_state::get_config;
17
18const ACCOUNT_CACHE_KEY: &str = "account";
19const BLOCKHASH_CACHE_KEY: &str = "kora:blockhash";
20const BLOCKHASH_TTL: u64 = 5;
23
24static CACHE_POOL: OnceCell<Option<Pool>> = OnceCell::const_new();
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CachedAccount {
30 pub account: Account,
31 pub cached_at: i64, }
33
34pub struct CacheUtil;
36
37impl CacheUtil {
38 pub async fn init() -> Result<(), KoraError> {
40 let config = get_config()?;
41
42 #[allow(clippy::needless_borrow)]
43 let pool = if CacheUtil::is_cache_enabled(&config) {
44 let redis_url = config.kora.cache.url.as_ref().ok_or(KoraError::ConfigError(
45 "Redis URL is required when cache is enabled. Set redis_url in config".to_string(),
46 ))?;
47
48 let cfg = deadpool_redis::Config::from_url(redis_url);
49 let pool = cfg.create_pool(Some(Runtime::Tokio1)).map_err(|e| {
50 KoraError::InternalServerError(format!(
51 "Failed to create cache pool: {}",
52 sanitize_error!(e)
53 ))
54 })?;
55
56 let mut conn = pool.get().await.map_err(|e| {
58 KoraError::InternalServerError(format!(
59 "Failed to connect to cache: {}",
60 sanitize_error!(e)
61 ))
62 })?;
63
64 let _: Option<String> = conn.get("__connection_test__").await.map_err(|e| {
66 KoraError::InternalServerError(format!(
67 "Cache connection test failed: {}",
68 sanitize_error!(e)
69 ))
70 })?;
71
72 log::info!("Cache initialized successfully");
73
74 Some(pool)
75 } else {
76 log::info!("Cache disabled or no URL configured");
77 None
78 };
79
80 CACHE_POOL.set(pool).map_err(|_| {
81 KoraError::InternalServerError("Cache pool already initialized".to_string())
82 })?;
83
84 Ok(())
85 }
86
87 async fn get_connection(pool: &Pool) -> Result<deadpool_redis::Connection, KoraError> {
88 pool.get().await.map_err(|e| {
89 KoraError::InternalServerError(format!(
90 "Failed to get cache connection: {}",
91 sanitize_error!(e)
92 ))
93 })
94 }
95
96 fn get_account_key(pubkey: &Pubkey) -> String {
97 format!("{ACCOUNT_CACHE_KEY}:{pubkey}")
98 }
99
100 async fn get_account_from_rpc(
102 rpc_client: &RpcClient,
103 pubkey: &Pubkey,
104 ) -> Result<Account, KoraError> {
105 match rpc_client.get_account(pubkey).await {
106 Ok(account) => Ok(account),
107 Err(e) => {
108 let kora_error = e.into();
109 match kora_error {
110 KoraError::AccountNotFound(_) => {
111 Err(KoraError::AccountNotFound(pubkey.to_string()))
112 }
113 other_error => Err(other_error),
114 }
115 }
116 }
117 }
118
119 async fn get_from_cache(pool: &Pool, key: &str) -> Result<Option<CachedAccount>, KoraError> {
121 let mut conn = Self::get_connection(pool).await?;
122
123 let cached_data: Option<String> = conn.get(key).await.map_err(|e| {
124 KoraError::InternalServerError(format!(
125 "Failed to get from cache: {}",
126 sanitize_error!(e)
127 ))
128 })?;
129
130 match cached_data {
131 Some(data) => {
132 let cached_account: CachedAccount = serde_json::from_str(&data).map_err(|e| {
133 KoraError::InternalServerError(format!(
134 "Failed to deserialize cached data: {e}"
135 ))
136 })?;
137 Ok(Some(cached_account))
138 }
139 None => Ok(None),
140 }
141 }
142
143 async fn get_account_from_rpc_and_cache(
145 rpc_client: &RpcClient,
146 pubkey: &Pubkey,
147 pool: &Pool,
148 ttl: u64,
149 ) -> Result<Account, KoraError> {
150 let account = Self::get_account_from_rpc(rpc_client, pubkey).await?;
151
152 let cache_key = Self::get_account_key(pubkey);
153 let cached_account =
154 CachedAccount { account: account.clone(), cached_at: chrono::Utc::now().timestamp() };
155
156 if let Err(e) = Self::set_in_cache(pool, &cache_key, &cached_account, ttl).await {
157 log::warn!("Failed to cache account {pubkey}: {e}");
158 }
160
161 Ok(account)
162 }
163
164 async fn set_in_cache(
166 pool: &Pool,
167 key: &str,
168 data: &CachedAccount,
169 ttl_seconds: u64,
170 ) -> Result<(), KoraError> {
171 let mut conn = Self::get_connection(pool).await?;
172
173 let serialized = serde_json::to_string(data).map_err(|e| {
174 KoraError::InternalServerError(format!(
175 "Failed to serialize cache data: {}",
176 sanitize_error!(e)
177 ))
178 })?;
179
180 conn.set_ex::<_, _, ()>(key, serialized, ttl_seconds).await.map_err(|e| {
181 KoraError::InternalServerError(format!(
182 "Failed to set cache data: {}",
183 sanitize_error!(e)
184 ))
185 })?;
186
187 Ok(())
188 }
189
190 fn is_cache_enabled(config: &Config) -> bool {
192 config.kora.cache.enabled && config.kora.cache.url.is_some()
193 }
194
195 pub async fn get_account(
197 config: &Config,
198 rpc_client: &RpcClient,
199 pubkey: &Pubkey,
200 force_refresh: bool,
201 ) -> Result<Account, KoraError> {
202 if !CacheUtil::is_cache_enabled(config) {
204 return Self::get_account_from_rpc(rpc_client, pubkey).await;
205 }
206
207 let pool = match CACHE_POOL.get() {
209 Some(pool) => pool,
210 None => {
211 return Self::get_account_from_rpc(rpc_client, pubkey).await;
213 }
214 };
215
216 let pool = match pool {
217 Some(pool) => pool,
218 None => {
219 return Self::get_account_from_rpc(rpc_client, pubkey).await;
221 }
222 };
223
224 if force_refresh {
225 return Self::get_account_from_rpc_and_cache(
226 rpc_client,
227 pubkey,
228 pool,
229 config.kora.cache.account_ttl,
230 )
231 .await;
232 }
233
234 let cache_key = Self::get_account_key(pubkey);
235
236 if let Ok(Some(cached_account)) = Self::get_from_cache(pool, &cache_key).await {
238 let current_time = chrono::Utc::now().timestamp();
239 let cache_age = current_time - cached_account.cached_at;
240
241 if cache_age < config.kora.cache.account_ttl as i64 {
243 return Ok(cached_account.account);
244 }
245 }
246
247 let account = Self::get_account_from_rpc_and_cache(
249 rpc_client,
250 pubkey,
251 pool,
252 config.kora.cache.account_ttl,
253 )
254 .await?;
255
256 Ok(account)
257 }
258
259 pub async fn get_or_fetch_latest_blockhash(
264 config: &Config,
265 rpc_client: &RpcClient,
266 ) -> Result<Hash, KoraError> {
267 if !CacheUtil::is_cache_enabled(config) {
269 return Self::fetch_blockhash_from_rpc(rpc_client).await;
270 }
271
272 let pool = match CACHE_POOL.get() {
274 Some(Some(pool)) => pool,
275 _ => return Self::fetch_blockhash_from_rpc(rpc_client).await,
276 };
277
278 match Self::get_blockhash_from_cache(pool).await {
280 Ok(Some(hash)) => return Ok(hash),
281 Ok(None) => { }
282 Err(e) => {
283 log::warn!("Failed to get blockhash from cache, falling back to RPC: {e}");
284 }
285 }
286
287 let hash = Self::fetch_blockhash_from_rpc(rpc_client).await?;
289
290 if let Err(e) = Self::set_blockhash_in_cache(pool, &hash).await {
291 log::warn!("Failed to cache blockhash: {e}");
292 }
294
295 Ok(hash)
296 }
297
298 async fn fetch_blockhash_from_rpc(rpc_client: &RpcClient) -> Result<Hash, KoraError> {
300 let (blockhash, _) = rpc_client
301 .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
302 .await
303 .map_err(|e| KoraError::RpcError(e.to_string()))?;
304 Ok(blockhash)
305 }
306
307 async fn get_blockhash_from_cache(pool: &Pool) -> Result<Option<Hash>, KoraError> {
309 let mut conn = Self::get_connection(pool).await?;
310
311 let cached: Option<String> = conn.get(BLOCKHASH_CACHE_KEY).await.map_err(|e| {
312 KoraError::InternalServerError(format!(
313 "Failed to get blockhash from cache: {}",
314 sanitize_error!(e)
315 ))
316 })?;
317
318 match cached {
319 Some(s) => {
320 let hash = Hash::from_str(&s).map_err(|e| {
321 KoraError::InternalServerError(format!("Failed to parse cached blockhash: {e}"))
322 })?;
323 Ok(Some(hash))
324 }
325 None => Ok(None),
326 }
327 }
328
329 async fn set_blockhash_in_cache(pool: &Pool, hash: &Hash) -> Result<(), KoraError> {
331 let mut conn = Self::get_connection(pool).await?;
332
333 conn.set_ex::<_, _, ()>(BLOCKHASH_CACHE_KEY, hash.to_string(), BLOCKHASH_TTL)
334 .await
335 .map_err(|e| {
336 KoraError::InternalServerError(format!(
337 "Failed to set blockhash in cache: {}",
338 sanitize_error!(e)
339 ))
340 })?;
341
342 Ok(())
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::tests::{
350 common::{create_mock_token_account, RpcMockBuilder},
351 config_mock::ConfigMockBuilder,
352 };
353
354 #[tokio::test]
355 async fn test_is_cache_enabled_disabled() {
356 let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
357
358 let config = get_config().unwrap();
359 assert!(!CacheUtil::is_cache_enabled(&config));
360 }
361
362 #[tokio::test]
363 async fn test_is_cache_enabled_no_url() {
364 let _m = ConfigMockBuilder::new()
365 .with_cache_enabled(true)
366 .with_cache_url(None) .build_and_setup();
368
369 let config = get_config().unwrap();
371 assert!(!CacheUtil::is_cache_enabled(&config));
372 }
373
374 #[tokio::test]
375 async fn test_is_cache_enabled_with_url() {
376 let _m = ConfigMockBuilder::new()
377 .with_cache_enabled(true)
378 .with_cache_url(Some("redis://localhost:6379".to_string()))
379 .build_and_setup();
380
381 let config = get_config().unwrap();
383 assert!(CacheUtil::is_cache_enabled(&config));
384 }
385
386 #[tokio::test]
387 async fn test_get_account_key_format() {
388 let pubkey = Pubkey::new_unique();
389 let key = CacheUtil::get_account_key(&pubkey);
390 assert_eq!(key, format!("account:{pubkey}"));
391 }
392
393 #[tokio::test]
394 async fn test_get_account_from_rpc_success() {
395 let pubkey = Pubkey::new_unique();
396 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
397
398 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
399
400 let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
401
402 assert!(result.is_ok());
403 let account = result.unwrap();
404 assert_eq!(account.lamports, expected_account.lamports);
405 assert_eq!(account.owner, expected_account.owner);
406 }
407
408 #[tokio::test]
409 async fn test_get_account_from_rpc_error() {
410 let pubkey = Pubkey::new_unique();
411 let rpc_client = RpcMockBuilder::new().with_account_not_found().build();
412
413 let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
414
415 assert!(result.is_err());
416 match result.unwrap_err() {
417 KoraError::AccountNotFound(account_key) => {
418 assert_eq!(account_key, pubkey.to_string());
419 }
420 _ => panic!("Expected AccountNotFound for account not found error"),
421 }
422 }
423
424 #[tokio::test]
425 async fn test_get_account_cache_disabled_fallback_to_rpc() {
426 let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
427
428 let pubkey = Pubkey::new_unique();
429 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
430
431 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
432
433 let config = get_config().unwrap();
434 let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, false).await;
435
436 assert!(result.is_ok());
437 let account = result.unwrap();
438 assert_eq!(account.lamports, expected_account.lamports);
439 }
440
441 #[tokio::test]
442 async fn test_get_account_force_refresh_bypasses_cache() {
443 let _m = ConfigMockBuilder::new()
444 .with_cache_enabled(false) .build_and_setup();
446
447 let pubkey = Pubkey::new_unique();
448 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
449
450 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
451
452 let config = get_config().unwrap();
454 let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, true).await;
455
456 assert!(result.is_ok());
457 let account = result.unwrap();
458 assert_eq!(account.lamports, expected_account.lamports);
459 }
460
461 #[tokio::test]
462 async fn test_get_or_fetch_blockhash_cache_disabled() {
463 let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
464
465 let rpc_client = RpcMockBuilder::new().with_blockhash().build();
466
467 let config = get_config().unwrap();
468 let result = CacheUtil::get_or_fetch_latest_blockhash(&config, &rpc_client).await;
469
470 assert!(result.is_ok(), "Should successfully get blockhash with cache disabled");
471 let hash = result.unwrap();
472 assert_ne!(hash, Hash::default(), "Blockhash should not be the default hash");
473 }
474
475 #[tokio::test]
476 async fn test_fetch_blockhash_from_rpc_success() {
477 let rpc_client = RpcMockBuilder::new().with_blockhash().build();
478
479 let result = CacheUtil::fetch_blockhash_from_rpc(&rpc_client).await;
480
481 assert!(result.is_ok(), "Should successfully fetch blockhash from RPC");
482 let hash = result.unwrap();
483 assert_ne!(hash, Hash::default(), "Blockhash should not be the default hash");
484 }
485}