1use deadpool_redis::{Pool, Runtime};
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_sdk::{account::Account, pubkey::Pubkey};
6use tokio::sync::OnceCell;
7
8use crate::{error::KoraError, sanitize_error};
9
10#[cfg(not(test))]
11use crate::state::get_config;
12
13#[cfg(test)]
14use crate::tests::config_mock::mock_state::get_config;
15
16const ACCOUNT_CACHE_KEY: &str = "account";
17
18static CACHE_POOL: OnceCell<Option<Pool>> = OnceCell::const_new();
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CachedAccount {
24 pub account: Account,
25 pub cached_at: i64, }
27
28pub struct CacheUtil;
30
31impl CacheUtil {
32 pub async fn init() -> Result<(), KoraError> {
34 let config = get_config()?;
35
36 let pool = if CacheUtil::is_cache_enabled() {
37 let redis_url = config.kora.cache.url.as_ref().ok_or(KoraError::ConfigError)?;
38
39 let cfg = deadpool_redis::Config::from_url(redis_url);
40 let pool = cfg.create_pool(Some(Runtime::Tokio1)).map_err(|e| {
41 KoraError::InternalServerError(format!(
42 "Failed to create cache pool: {}",
43 sanitize_error!(e)
44 ))
45 })?;
46
47 let mut conn = pool.get().await.map_err(|e| {
49 KoraError::InternalServerError(format!(
50 "Failed to connect to cache: {}",
51 sanitize_error!(e)
52 ))
53 })?;
54
55 let _: Option<String> = conn.get("__connection_test__").await.map_err(|e| {
57 KoraError::InternalServerError(format!(
58 "Cache connection test failed: {}",
59 sanitize_error!(e)
60 ))
61 })?;
62
63 log::info!("Cache initialized successfully");
64
65 Some(pool)
66 } else {
67 log::info!("Cache disabled or no URL configured");
68 None
69 };
70
71 CACHE_POOL.set(pool).map_err(|_| {
72 KoraError::InternalServerError("Cache pool already initialized".to_string())
73 })?;
74
75 Ok(())
76 }
77
78 async fn get_connection(pool: &Pool) -> Result<deadpool_redis::Connection, KoraError> {
79 pool.get().await.map_err(|e| {
80 KoraError::InternalServerError(format!(
81 "Failed to get cache connection: {}",
82 sanitize_error!(e)
83 ))
84 })
85 }
86
87 fn get_account_key(pubkey: &Pubkey) -> String {
88 format!("{ACCOUNT_CACHE_KEY}:{pubkey}")
89 }
90
91 async fn get_account_from_rpc(
93 rpc_client: &RpcClient,
94 pubkey: &Pubkey,
95 ) -> Result<Account, KoraError> {
96 match rpc_client.get_account(pubkey).await {
97 Ok(account) => Ok(account),
98 Err(e) => {
99 let kora_error = e.into();
100 match kora_error {
101 KoraError::AccountNotFound(_) => {
102 Err(KoraError::AccountNotFound(pubkey.to_string()))
103 }
104 other_error => Err(other_error),
105 }
106 }
107 }
108 }
109
110 async fn get_from_cache(pool: &Pool, key: &str) -> Result<Option<CachedAccount>, KoraError> {
112 let mut conn = Self::get_connection(pool).await?;
113
114 let cached_data: Option<String> = conn.get(key).await.map_err(|e| {
115 KoraError::InternalServerError(format!(
116 "Failed to get from cache: {}",
117 sanitize_error!(e)
118 ))
119 })?;
120
121 match cached_data {
122 Some(data) => {
123 let cached_account: CachedAccount = serde_json::from_str(&data).map_err(|e| {
124 KoraError::InternalServerError(format!(
125 "Failed to deserialize cached data: {e}"
126 ))
127 })?;
128 Ok(Some(cached_account))
129 }
130 None => Ok(None),
131 }
132 }
133
134 async fn get_account_from_rpc_and_cache(
136 rpc_client: &RpcClient,
137 pubkey: &Pubkey,
138 pool: &Pool,
139 ttl: u64,
140 ) -> Result<Account, KoraError> {
141 let account = Self::get_account_from_rpc(rpc_client, pubkey).await?;
142
143 let cache_key = Self::get_account_key(pubkey);
144 let cached_account =
145 CachedAccount { account: account.clone(), cached_at: chrono::Utc::now().timestamp() };
146
147 if let Err(e) = Self::set_in_cache(pool, &cache_key, &cached_account, ttl).await {
148 log::warn!("Failed to cache account {pubkey}: {e}");
149 }
151
152 Ok(account)
153 }
154
155 async fn set_in_cache(
157 pool: &Pool,
158 key: &str,
159 data: &CachedAccount,
160 ttl_seconds: u64,
161 ) -> Result<(), KoraError> {
162 let mut conn = Self::get_connection(pool).await?;
163
164 let serialized = serde_json::to_string(data).map_err(|e| {
165 KoraError::InternalServerError(format!(
166 "Failed to serialize cache data: {}",
167 sanitize_error!(e)
168 ))
169 })?;
170
171 conn.set_ex::<_, _, ()>(key, serialized, ttl_seconds).await.map_err(|e| {
172 KoraError::InternalServerError(format!(
173 "Failed to set cache data: {}",
174 sanitize_error!(e)
175 ))
176 })?;
177
178 Ok(())
179 }
180
181 fn is_cache_enabled() -> bool {
183 match get_config() {
184 Ok(config) => config.kora.cache.enabled && config.kora.cache.url.is_some(),
185 Err(_) => false,
186 }
187 }
188
189 pub async fn get_account(
191 rpc_client: &RpcClient,
192 pubkey: &Pubkey,
193 force_refresh: bool,
194 ) -> Result<Account, KoraError> {
195 let config = get_config()?;
196
197 if !CacheUtil::is_cache_enabled() {
199 return Self::get_account_from_rpc(rpc_client, pubkey).await;
200 }
201
202 let pool = match CACHE_POOL.get() {
204 Some(pool) => pool,
205 None => {
206 return Self::get_account_from_rpc(rpc_client, pubkey).await;
208 }
209 };
210
211 let pool = match pool {
212 Some(pool) => pool,
213 None => {
214 return Self::get_account_from_rpc(rpc_client, pubkey).await;
216 }
217 };
218
219 if force_refresh {
220 return Self::get_account_from_rpc_and_cache(
221 rpc_client,
222 pubkey,
223 pool,
224 config.kora.cache.account_ttl,
225 )
226 .await;
227 }
228
229 let cache_key = Self::get_account_key(pubkey);
230
231 if let Ok(Some(cached_account)) = Self::get_from_cache(pool, &cache_key).await {
233 let current_time = chrono::Utc::now().timestamp();
234 let cache_age = current_time - cached_account.cached_at;
235
236 if cache_age < config.kora.cache.account_ttl as i64 {
238 return Ok(cached_account.account);
239 }
240 }
241
242 let account = Self::get_account_from_rpc_and_cache(
244 rpc_client,
245 pubkey,
246 pool,
247 config.kora.cache.account_ttl,
248 )
249 .await?;
250
251 Ok(account)
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::tests::{
259 common::{create_mock_token_account, RpcMockBuilder},
260 config_mock::ConfigMockBuilder,
261 };
262
263 #[tokio::test]
264 async fn test_is_cache_enabled_disabled() {
265 let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
266
267 assert!(!CacheUtil::is_cache_enabled());
268 }
269
270 #[tokio::test]
271 async fn test_is_cache_enabled_no_url() {
272 let _m = ConfigMockBuilder::new()
273 .with_cache_enabled(true)
274 .with_cache_url(None) .build_and_setup();
276
277 assert!(!CacheUtil::is_cache_enabled());
279 }
280
281 #[tokio::test]
282 async fn test_is_cache_enabled_with_url() {
283 let _m = ConfigMockBuilder::new()
284 .with_cache_enabled(true)
285 .with_cache_url(Some("redis://localhost:6379".to_string()))
286 .build_and_setup();
287
288 assert!(CacheUtil::is_cache_enabled());
290 }
291
292 #[tokio::test]
293 async fn test_get_account_key_format() {
294 let pubkey = Pubkey::new_unique();
295 let key = CacheUtil::get_account_key(&pubkey);
296 assert_eq!(key, format!("account:{pubkey}"));
297 }
298
299 #[tokio::test]
300 async fn test_get_account_from_rpc_success() {
301 let pubkey = Pubkey::new_unique();
302 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
303
304 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
305
306 let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
307
308 assert!(result.is_ok());
309 let account = result.unwrap();
310 assert_eq!(account.lamports, expected_account.lamports);
311 assert_eq!(account.owner, expected_account.owner);
312 }
313
314 #[tokio::test]
315 async fn test_get_account_from_rpc_error() {
316 let pubkey = Pubkey::new_unique();
317 let rpc_client = RpcMockBuilder::new().with_account_not_found().build();
318
319 let result = CacheUtil::get_account_from_rpc(&rpc_client, &pubkey).await;
320
321 assert!(result.is_err());
322 match result.unwrap_err() {
323 KoraError::AccountNotFound(account_key) => {
324 assert_eq!(account_key, pubkey.to_string());
325 }
326 _ => panic!("Expected AccountNotFound for account not found error"),
327 }
328 }
329
330 #[tokio::test]
331 async fn test_get_account_cache_disabled_fallback_to_rpc() {
332 let _m = ConfigMockBuilder::new().with_cache_enabled(false).build_and_setup();
333
334 let pubkey = Pubkey::new_unique();
335 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
336
337 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
338
339 let result = CacheUtil::get_account(&rpc_client, &pubkey, false).await;
340
341 assert!(result.is_ok());
342 let account = result.unwrap();
343 assert_eq!(account.lamports, expected_account.lamports);
344 }
345
346 #[tokio::test]
347 async fn test_get_account_force_refresh_bypasses_cache() {
348 let _m = ConfigMockBuilder::new()
349 .with_cache_enabled(false) .build_and_setup();
351
352 let pubkey = Pubkey::new_unique();
353 let expected_account = create_mock_token_account(&pubkey, &Pubkey::new_unique());
354
355 let rpc_client = RpcMockBuilder::new().with_account_info(&expected_account).build();
356
357 let result = CacheUtil::get_account(&rpc_client, &pubkey, true).await;
359
360 assert!(result.is_ok());
361 let account = result.unwrap();
362 assert_eq!(account.lamports, expected_account.lamports);
363 }
364}