1use deadpool_redis::{Pool, Runtime};
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_sdk::{account::Account, pubkey::Pubkey};
6use tokio::sync::OnceCell;
7
8use crate::{config::Config, error::KoraError, sanitize_error};
9
10#[cfg(not(test))]
11use crate::state::get_config;
12
13#[cfg(test)]
14use crate::tests::config_mock::mock_state::get_config;
15
16const ACCOUNT_CACHE_KEY: &str = "account";
17
18static CACHE_POOL: OnceCell<Option<Pool>> = OnceCell::const_new();
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CachedAccount {
24 pub account: Account,
25 pub cached_at: i64, }
27
28pub struct CacheUtil;
30
31impl CacheUtil {
32 pub async fn init() -> Result<(), KoraError> {
34 let config = get_config()?;
35
36 #[allow(clippy::needless_borrow)]
37 let pool = if CacheUtil::is_cache_enabled(&config) {
38 let redis_url = config.kora.cache.url.as_ref().ok_or(KoraError::ConfigError)?;
39
40 let cfg = deadpool_redis::Config::from_url(redis_url);
41 let pool = cfg.create_pool(Some(Runtime::Tokio1)).map_err(|e| {
42 KoraError::InternalServerError(format!(
43 "Failed to create cache pool: {}",
44 sanitize_error!(e)
45 ))
46 })?;
47
48 let mut conn = pool.get().await.map_err(|e| {
50 KoraError::InternalServerError(format!(
51 "Failed to connect to cache: {}",
52 sanitize_error!(e)
53 ))
54 })?;
55
56 let _: Option<String> = conn.get("__connection_test__").await.map_err(|e| {
58 KoraError::InternalServerError(format!(
59 "Cache connection test failed: {}",
60 sanitize_error!(e)
61 ))
62 })?;
63
64 log::info!("Cache initialized successfully");
65
66 Some(pool)
67 } else {
68 log::info!("Cache disabled or no URL configured");
69 None
70 };
71
72 CACHE_POOL.set(pool).map_err(|_| {
73 KoraError::InternalServerError("Cache pool already initialized".to_string())
74 })?;
75
76 Ok(())
77 }
78
79 async fn get_connection(pool: &Pool) -> Result<deadpool_redis::Connection, KoraError> {
80 pool.get().await.map_err(|e| {
81 KoraError::InternalServerError(format!(
82 "Failed to get cache connection: {}",
83 sanitize_error!(e)
84 ))
85 })
86 }
87
88 fn get_account_key(pubkey: &Pubkey) -> String {
89 format!("{ACCOUNT_CACHE_KEY}:{pubkey}")
90 }
91
92 async fn get_account_from_rpc(
94 rpc_client: &RpcClient,
95 pubkey: &Pubkey,
96 ) -> Result<Account, KoraError> {
97 match rpc_client.get_account(pubkey).await {
98 Ok(account) => Ok(account),
99 Err(e) => {
100 let kora_error = e.into();
101 match kora_error {
102 KoraError::AccountNotFound(_) => {
103 Err(KoraError::AccountNotFound(pubkey.to_string()))
104 }
105 other_error => Err(other_error),
106 }
107 }
108 }
109 }
110
111 async fn get_from_cache(pool: &Pool, key: &str) -> Result<Option<CachedAccount>, KoraError> {
113 let mut conn = Self::get_connection(pool).await?;
114
115 let cached_data: Option<String> = conn.get(key).await.map_err(|e| {
116 KoraError::InternalServerError(format!(
117 "Failed to get from cache: {}",
118 sanitize_error!(e)
119 ))
120 })?;
121
122 match cached_data {
123 Some(data) => {
124 let cached_account: CachedAccount = serde_json::from_str(&data).map_err(|e| {
125 KoraError::InternalServerError(format!(
126 "Failed to deserialize cached data: {e}"
127 ))
128 })?;
129 Ok(Some(cached_account))
130 }
131 None => Ok(None),
132 }
133 }
134
135 async fn get_account_from_rpc_and_cache(
137 rpc_client: &RpcClient,
138 pubkey: &Pubkey,
139 pool: &Pool,
140 ttl: u64,
141 ) -> Result<Account, KoraError> {
142 let account = Self::get_account_from_rpc(rpc_client, pubkey).await?;
143
144 let cache_key = Self::get_account_key(pubkey);
145 let cached_account =
146 CachedAccount { account: account.clone(), cached_at: chrono::Utc::now().timestamp() };
147
148 if let Err(e) = Self::set_in_cache(pool, &cache_key, &cached_account, ttl).await {
149 log::warn!("Failed to cache account {pubkey}: {e}");
150 }
152
153 Ok(account)
154 }
155
156 async fn set_in_cache(
158 pool: &Pool,
159 key: &str,
160 data: &CachedAccount,
161 ttl_seconds: u64,
162 ) -> Result<(), KoraError> {
163 let mut conn = Self::get_connection(pool).await?;
164
165 let serialized = serde_json::to_string(data).map_err(|e| {
166 KoraError::InternalServerError(format!(
167 "Failed to serialize cache data: {}",
168 sanitize_error!(e)
169 ))
170 })?;
171
172 conn.set_ex::<_, _, ()>(key, serialized, ttl_seconds).await.map_err(|e| {
173 KoraError::InternalServerError(format!(
174 "Failed to set cache data: {}",
175 sanitize_error!(e)
176 ))
177 })?;
178
179 Ok(())
180 }
181
182 fn is_cache_enabled(config: &Config) -> bool {
184 config.kora.cache.enabled && config.kora.cache.url.is_some()
185 }
186
187 pub async fn get_account(
189 config: &Config,
190 rpc_client: &RpcClient,
191 pubkey: &Pubkey,
192 force_refresh: bool,
193 ) -> Result<Account, KoraError> {
194 if !CacheUtil::is_cache_enabled(config) {
196 return Self::get_account_from_rpc(rpc_client, pubkey).await;
197 }
198
199 let pool = match CACHE_POOL.get() {
201 Some(pool) => pool,
202 None => {
203 return Self::get_account_from_rpc(rpc_client, pubkey).await;
205 }
206 };
207
208 let pool = match pool {
209 Some(pool) => pool,
210 None => {
211 return Self::get_account_from_rpc(rpc_client, pubkey).await;
213 }
214 };
215
216 if force_refresh {
217 return Self::get_account_from_rpc_and_cache(
218 rpc_client,
219 pubkey,
220 pool,
221 config.kora.cache.account_ttl,
222 )
223 .await;
224 }
225
226 let cache_key = Self::get_account_key(pubkey);
227
228 if let Ok(Some(cached_account)) = Self::get_from_cache(pool, &cache_key).await {
230 let current_time = chrono::Utc::now().timestamp();
231 let cache_age = current_time - cached_account.cached_at;
232
233 if cache_age < config.kora.cache.account_ttl as i64 {
235 return Ok(cached_account.account);
236 }
237 }
238
239 let account = Self::get_account_from_rpc_and_cache(
241 rpc_client,
242 pubkey,
243 pool,
244 config.kora.cache.account_ttl,
245 )
246 .await?;
247
248 Ok(account)
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use crate::tests::{
256 common::{create_mock_token_account, RpcMockBuilder},
257 config_mock::ConfigMockBuilder,
258 };
259
260 #[tokio::test]
261 async fn test_is_cache_enabled_disabled() {
262 let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
263
264 let config = get_config().unwrap();
265 assert!(!CacheUtil::is_cache_enabled(&config));
266 }
267
268 #[tokio::test]
269 async fn test_is_cache_enabled_no_url() {
270 let _m = ConfigMockBuilder::new()
271 .with_cache_enabled(true)
272 .with_cache_url(None) .build_and_setup();
274
275 let config = get_config().unwrap();
277 assert!(!CacheUtil::is_cache_enabled(&config));
278 }
279
280 #[tokio::test]
281 async fn test_is_cache_enabled_with_url() {
282 let _m = ConfigMockBuilder::new()
283 .with_cache_enabled(true)
284 .with_cache_url(Some("redis://localhost:6379".to_string()))
285 .build_and_setup();
286
287 let config = get_config().unwrap();
289 assert!(CacheUtil::is_cache_enabled(&config));
290 }
291
292 #[tokio::test]
293 async fn test_get_account_key_format() {
294 let pubkey = Pubkey::new_unique();
295 let key = CacheUtil::get_account_key(&pubkey);
296 assert_eq!(key, format!("account:{pubkey}"));
297 }
298
299 #[tokio::test]
300 async fn test_get_account_from_rpc_success() {
301 let pubkey = Pubkey::new_unique();
302 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
303
304 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
305
306 let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
307
308 assert!(result.is_ok());
309 let account = result.unwrap();
310 assert_eq!(account.lamports, expected_account.lamports);
311 assert_eq!(account.owner, expected_account.owner);
312 }
313
314 #[tokio::test]
315 async fn test_get_account_from_rpc_error() {
316 let pubkey = Pubkey::new_unique();
317 let rpc_client = RpcMockBuilder::new().with_account_not_found().build();
318
319 let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
320
321 assert!(result.is_err());
322 match result.unwrap_err() {
323 KoraError::AccountNotFound(account_key) => {
324 assert_eq!(account_key, pubkey.to_string());
325 }
326 _ => panic!("Expected AccountNotFound for account not found error"),
327 }
328 }
329
330 #[tokio::test]
331 async fn test_get_account_cache_disabled_fallback_to_rpc() {
332 let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
333
334 let pubkey = Pubkey::new_unique();
335 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
336
337 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
338
339 let config = get_config().unwrap();
340 let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, false).await;
341
342 assert!(result.is_ok());
343 let account = result.unwrap();
344 assert_eq!(account.lamports, expected_account.lamports);
345 }
346
347 #[tokio::test]
348 async fn test_get_account_force_refresh_bypasses_cache() {
349 let _m = ConfigMockBuilder::new()
350 .with_cache_enabled(false) .build_and_setup();
352
353 let pubkey = Pubkey::new_unique();
354 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
355
356 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
357
358 let config = get_config().unwrap();
360 let result = CacheUtil::get_account(&config, &rpc_client, &pubkey, true).await;
361
362 assert!(result.is_ok());
363 let account = result.unwrap();
364 assert_eq!(account.lamports, expected_account.lamports);
365 }
366}